From 92242c7c07d7d9f5aea2111b548a3355f3633a7d Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Tue, 2 Dec 2014 18:57:59 +0800 Subject: [PATCH 01/17] Update HiveQl.scala --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b9283f668a9b5..bc6d87cd491f2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1025,6 +1025,7 @@ private[hive] object HiveQl { case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right)) case Token(OR(), left :: right:: Nil) => Or(nodeToExpr(left), nodeToExpr(right)) case Token(NOT(), child :: Nil) => Not(nodeToExpr(child)) + case Token("!", child :: Nil) => Not(nodeToExpr(child)) /* Case statements */ case Token("TOK_FUNCTION", Token(WHEN(), Nil) :: branches) => From 74175b484727b63e2805b9ef2c8f208f537552e5 Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Wed, 3 Dec 2014 11:36:37 +0800 Subject: [PATCH 02/17] Update HiveQuerySuite.scala --- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index af45dfd6e28c2..173ef24ee0abc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -413,6 +413,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("select null from table", "SELECT null FROM src LIMIT 1") + createQueryTest("! boolean logic operator", + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + |WHERE !(a>1) + """.stripMargin) + test("implement identity function using case statement") { val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src") .map { case Row(i: Int) => i } From 950b21e7c4a57800f251b5bb1057df3148be591d Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Wed, 3 Dec 2014 13:31:17 +0800 Subject: [PATCH 03/17] Update HiveQuerySuite.scala --- .../sql/hive/execution/HiveQuerySuite.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 173ef24ee0abc..36624f9569123 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -413,13 +413,15 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("select null from table", "SELECT null FROM src LIMIT 1") - createQueryTest("! boolean logic operator", - """ - |SELECT a FROM ( - | SELECT 1 AS a FROM src LIMIT 1 UNION ALL - | SELECT 2 AS a FROM src LIMIT 1) table - |WHERE !(a>1) - """.stripMargin) + test("! boolean logic operator") { + sql( + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + |WHERE !(a>1) + """.stripMargin).collect() + } test("implement identity function using case statement") { val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src") From 59e4de94d48a947c54adbbe353137c78ee0052bc Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 16 Dec 2014 21:06:59 -0800 Subject: [PATCH 04/17] make hive test --- .../golden/! operator-0-81d1a187c7f4a6337baf081510a5dc5e | 1 + .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 ++++++++ 2 files changed, 9 insertions(+) create mode 100644 sql/hive/src/test/resources/golden/! operator-0-81d1a187c7f4a6337baf081510a5dc5e diff --git a/sql/hive/src/test/resources/golden/! operator-0-81d1a187c7f4a6337baf081510a5dc5e b/sql/hive/src/test/resources/golden/! operator-0-81d1a187c7f4a6337baf081510a5dc5e new file mode 100644 index 0000000000000..d00491fd7e5bb --- /dev/null +++ b/sql/hive/src/test/resources/golden/! operator-0-81d1a187c7f4a6337baf081510a5dc5e @@ -0,0 +1 @@ +1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 36624f9569123..ab531be2155b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -56,6 +56,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { Locale.setDefault(originalLocale) } + createQueryTest("! operator", + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + |WHERE !(a>1) + """.stripMargin) + createQueryTest("constant object inspector for generic udf", """SELECT named_struct( lower("AA"), "10", From bd2c44452bb9244365bea0726b71e632e84c35ef Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Mon, 22 Dec 2014 13:31:51 +0800 Subject: [PATCH 05/17] Update HiveQuerySuite.scala --- .../spark/sql/hive/execution/HiveQuerySuite.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ab531be2155b1..9c34cb081679f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -421,16 +421,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("select null from table", "SELECT null FROM src LIMIT 1") - test("! boolean logic operator") { - sql( - """ - |SELECT a FROM ( - | SELECT 1 AS a FROM src LIMIT 1 UNION ALL - | SELECT 2 AS a FROM src LIMIT 1) table - |WHERE !(a>1) - """.stripMargin).collect() - } - test("implement identity function using case statement") { val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src") .map { case Row(i: Int) => i } From efc42108cfdf8b7f2ba72892dfe0db6a6e3d0ecd Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Mon, 22 Dec 2014 14:20:05 +0800 Subject: [PATCH 06/17] Update HiveQuerySuite.scala --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 9c34cb081679f..75bacf6db6d11 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -61,7 +61,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |SELECT a FROM ( | SELECT 1 AS a FROM src LIMIT 1 UNION ALL | SELECT 2 AS a FROM src LIMIT 1) table - |WHERE !(a>1) + |WHERE !(a > 1) """.stripMargin) createQueryTest("constant object inspector for generic udf", From 1e1ebb44d541433e8c13a556055dac089072634e Mon Sep 17 00:00:00 2001 From: YanTangZhai Date: Mon, 22 Dec 2014 17:24:09 +0800 Subject: [PATCH 07/17] Update HiveQuerySuite.scala --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 75bacf6db6d11..9c34cb081679f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -61,7 +61,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |SELECT a FROM ( | SELECT 1 AS a FROM src LIMIT 1 UNION ALL | SELECT 2 AS a FROM src LIMIT 1) table - |WHERE !(a > 1) + |WHERE !(a>1) """.stripMargin) createQueryTest("constant object inspector for generic udf", From 5601a8b1458c9a7317a2e4e0463358f0a054c181 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Thu, 25 Dec 2014 11:17:57 +0800 Subject: [PATCH 08/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a157e36e2286e..36b08d1b91cd6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -190,7 +190,8 @@ class HadoopRDD[K, V]( newInputFormat } - override def getPartitions: Array[Partition] = { + @transient private val thesePartitions_ : Array[Partition] = { + val start = System.nanoTime val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) @@ -203,9 +204,12 @@ class HadoopRDD[K, V]( for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } + logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) array } + override def getPartitions: Array[Partition] = thesePartitions_ + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { From af5abdacaf5637e05200216bd8dfcdcfe15b4e17 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Thu, 25 Dec 2014 14:06:03 +0800 Subject: [PATCH 09/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../org/apache/spark/rdd/HadoopRDD.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 36b08d1b91cd6..b7852c1e058b4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -190,7 +190,7 @@ class HadoopRDD[K, V]( newInputFormat } - @transient private val thesePartitions_ : Array[Partition] = { + protected def getThesePartitions() : Array[Partition] = { val start = System.nanoTime val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized @@ -208,7 +208,22 @@ class HadoopRDD[K, V]( array } - override def getPartitions: Array[Partition] = thesePartitions_ + @transient private var thesePartitions_ : Array[Partition] = { + try { + getThesePartitions() + } catch { + case e: Exception => + logDebug("Error initializing HadoopRDD's partitions", e) + null + } + } + + override def getPartitions: Array[Partition] = { + if (thesePartitions_ == null) { + thesePartitions_ = getThesePartitions() + } + thesePartitions_ + } override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { From 6e95955c9c67ce509372fe08f9ced962eb251593 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 30 Dec 2014 20:39:53 +0800 Subject: [PATCH 10/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../org/apache/spark/rdd/BinaryFileRDD.scala | 2 ++ .../org/apache/spark/rdd/HadoopRDD.scala | 21 ++----------------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 2 ++ .../main/scala/org/apache/spark/rdd/RDD.scala | 5 +---- 4 files changed, 7 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 1f755db485812..de3bbd662d233 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -33,6 +33,7 @@ private[spark] class BinaryFileRDD[T]( extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { + val start = System.nanoTime val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => @@ -46,6 +47,7 @@ private[spark] class BinaryFileRDD[T]( for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } + logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) result } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index b7852c1e058b4..41233b92f2d1a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -190,7 +190,7 @@ class HadoopRDD[K, V]( newInputFormat } - protected def getThesePartitions() : Array[Partition] = { + override def getPartitions: Array[Partition] = { val start = System.nanoTime val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized @@ -207,24 +207,7 @@ class HadoopRDD[K, V]( logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) array } - - @transient private var thesePartitions_ : Array[Partition] = { - try { - getThesePartitions() - } catch { - case e: Exception => - logDebug("Error initializing HadoopRDD's partitions", e) - null - } - } - - override def getPartitions: Array[Partition] = { - if (thesePartitions_ == null) { - thesePartitions_ = getThesePartitions() - } - thesePartitions_ - } - + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e55d03d391e03..6485a1fb35a9a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -88,6 +88,7 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) override def getPartitions: Array[Partition] = { + val start = System.nanoTime val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => @@ -100,6 +101,7 @@ class NewHadoopRDD[K, V]( for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } + logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) result } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f47c2d1fcdcc7..e06f04c3d6908 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -178,7 +178,7 @@ abstract class RDD[T: ClassTag]( // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null - @transient private var partitions_ : Array[Partition] = null + @transient private var partitions_ : Array[Partition] = getPartitions /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD) @@ -202,9 +202,6 @@ abstract class RDD[T: ClassTag]( */ final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { - if (partitions_ == null) { - partitions_ = getPartitions - } partitions_ } } From fd87518d7f81de1a122cfad25a88956a596ccd4f Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 31 Dec 2014 19:12:52 +0800 Subject: [PATCH 11/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../scala/org/apache/spark/rdd/BinaryFileRDD.scala | 2 -- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 +--- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 -- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 13 ++++++++++++- .../org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++++++ 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index de3bbd662d233..1f755db485812 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -33,7 +33,6 @@ private[spark] class BinaryFileRDD[T]( extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { - val start = System.nanoTime val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => @@ -47,7 +46,6 @@ private[spark] class BinaryFileRDD[T]( for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } - logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) result } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 41233b92f2d1a..081fcd7e21431 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -191,7 +191,6 @@ class HadoopRDD[K, V]( } override def getPartitions: Array[Partition] = { - val start = System.nanoTime val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) @@ -204,10 +203,9 @@ class HadoopRDD[K, V]( for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } - logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) array } - + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6485a1fb35a9a..e55d03d391e03 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -88,7 +88,6 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) override def getPartitions: Array[Partition] = { - val start = System.nanoTime val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => @@ -101,7 +100,6 @@ class NewHadoopRDD[K, V]( for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } - logDebug("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) result } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e06f04c3d6908..c799006f0c861 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -178,7 +178,7 @@ abstract class RDD[T: ClassTag]( // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null - @transient private var partitions_ : Array[Partition] = getPartitions + @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD) @@ -202,6 +202,9 @@ abstract class RDD[T: ClassTag]( */ final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { + if (partitions_ == null) { + partitions_ = getPartitions + } partitions_ } } @@ -1323,6 +1326,14 @@ abstract class RDD[T: ClassTag]( } } + /** + * Traverses the lineage chain and calls partitions on each RDD. + */ + private[spark] def doPreGetPartitions() { + dependencies.foreach(_.rdd.doPreGetPartitions()) + partitions + } + /** * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) * created from the checkpoint file, and forget its old dependencies and partitions. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb8ccfbdbdcbb..b76548e6666a5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -493,6 +493,15 @@ class DAGScheduler( } assert(partitions.size > 0) + /** + * Makes sure that doPreGetPartitions occurs before + * the job submitter sends a message into the DAGScheduler actor. + * Although getPartitions may be called in rdd.partitions.length + * before doPreGetPartitions occurs. + */ + val start = System.nanoTime + rdd.doPreGetPartitions() + logInfo("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessActor ! JobSubmitted( From 74c1dec31ba9ded5a82640f7354aa6231169281c Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 31 Dec 2014 20:07:12 +0800 Subject: [PATCH 12/17] update HadoopRDD.scala --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 081fcd7e21431..a157e36e2286e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -205,7 +205,7 @@ class HadoopRDD[K, V]( } array } - + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { From 09afdff3f8e511c947fc14cd4673b1629c105f41 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Wed, 14 Jan 2015 17:34:21 +0800 Subject: [PATCH 13/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../main/scala/org/apache/spark/rdd/RDD.scala | 8 ---- .../apache/spark/scheduler/DAGScheduler.scala | 38 +++++++++++++++++-- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index c799006f0c861..f47c2d1fcdcc7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1326,14 +1326,6 @@ abstract class RDD[T: ClassTag]( } } - /** - * Traverses the lineage chain and calls partitions on each RDD. - */ - private[spark] def doPreGetPartitions() { - dependencies.foreach(_.rdd.doPreGetPartitions()) - partitions - } - /** * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) * created from the checkpoint file, and forget its old dependencies and partitions. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b76548e6666a5..5c23d12deeb23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -315,6 +315,36 @@ class DAGScheduler( parents.toList } + /** + * Get partitions of parent rdds for a given RDD. + * Traverses the lineage chain and calls partitions on each RDD. + */ + private[scheduler] val shuffleToRDDParts = new HashSet[Int] + private def getParentPartitions(rdd: RDD[_]) = { + val visited = new HashSet[RDD[_]] + val waitingForVisit = new Stack[RDD[_]] + def visit(r: RDD[_]) { + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shuffleDep: ShuffleDependency[_, _, _] => + if (!shuffleToRDDParts(shuffleDep.shuffleId)) { + shuffleDep.rdd.partitions + shuffleToRDDParts += shuffleDep.shuffleId + } + case _ => + waitingForVisit.push(dep.rdd) + } + } + } + } + waitingForVisit.push(rdd) + while (!waitingForVisit.isEmpty) { + visit(waitingForVisit.pop()) + } + } + // Find ancestor missing shuffle dependencies and register into shuffleToMapStage private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = { val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) @@ -493,14 +523,14 @@ class DAGScheduler( } assert(partitions.size > 0) + /** - * Makes sure that doPreGetPartitions occurs before + * Makes sure that getPartitions occurs before * the job submitter sends a message into the DAGScheduler actor. - * Although getPartitions may be called in rdd.partitions.length - * before doPreGetPartitions occurs. + * Although getPartitions may be called in rdd.partitions.length before this. */ val start = System.nanoTime - rdd.doPreGetPartitions() + getParentPartitions(rdd) logInfo("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) From b535a531ee853c29d63cda0154be54512740bc78 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Mon, 19 Jan 2015 14:55:06 +0800 Subject: [PATCH 14/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../apache/spark/scheduler/DAGScheduler.scala | 52 +++++-------------- 1 file changed, 13 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5c23d12deeb23..4f051b13703a2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -315,36 +315,6 @@ class DAGScheduler( parents.toList } - /** - * Get partitions of parent rdds for a given RDD. - * Traverses the lineage chain and calls partitions on each RDD. - */ - private[scheduler] val shuffleToRDDParts = new HashSet[Int] - private def getParentPartitions(rdd: RDD[_]) = { - val visited = new HashSet[RDD[_]] - val waitingForVisit = new Stack[RDD[_]] - def visit(r: RDD[_]) { - if (!visited(r)) { - visited += r - for (dep <- r.dependencies) { - dep match { - case shuffleDep: ShuffleDependency[_, _, _] => - if (!shuffleToRDDParts(shuffleDep.shuffleId)) { - shuffleDep.rdd.partitions - shuffleToRDDParts += shuffleDep.shuffleId - } - case _ => - waitingForVisit.push(dep.rdd) - } - } - } - } - waitingForVisit.push(rdd) - while (!waitingForVisit.isEmpty) { - visit(waitingForVisit.pop()) - } - } - // Find ancestor missing shuffle dependencies and register into shuffleToMapStage private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = { val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) @@ -523,17 +493,21 @@ class DAGScheduler( } assert(partitions.size > 0) - - /** - * Makes sure that getPartitions occurs before - * the job submitter sends a message into the DAGScheduler actor. - * Although getPartitions may be called in rdd.partitions.length before this. - */ - val start = System.nanoTime - getParentPartitions(rdd) - logInfo("Get these partitions took %f s".format((System.nanoTime - start) / 1e9)) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) + + // Makes sure that getPartitions occurs before + // the job submitter sends a message into the DAGScheduler actor. + // Although getPartitions may be called in rdd.partitions.length before this. + try { + getParentStages(rdd, jobId) + } catch { + case e: Exception => + logWarning("Get or create the list of parent stages failed due to exception - job: " + + jobId, e) + waiter.jobFailed(e) + return waiter + } eventProcessActor ! JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) waiter From aed530b31481e3f8ed007ee0abf99a9180d4342d Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Tue, 20 Jan 2015 09:45:36 +0800 Subject: [PATCH 15/17] [SPARK-4961] [CORE] Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4f051b13703a2..da5ab10d7cd8f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -500,10 +500,10 @@ class DAGScheduler( // the job submitter sends a message into the DAGScheduler actor. // Although getPartitions may be called in rdd.partitions.length before this. try { - getParentStages(rdd, jobId) + getParentStages(rdd, jobId).foreach(_.rdd.partitions) } catch { case e: Exception => - logWarning("Get or create the list of parent stages failed due to exception - job: " + logWarning("Get the partitions of parent stages' rdds failed due to exception - job: " + jobId, e) waiter.jobFailed(e) return waiter From 572079b1a7bfd9f24c0507da562d25f63dc93bdd Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Sat, 24 Jan 2015 15:45:16 +0800 Subject: [PATCH 16/17] update --- .../sql/hive/execution/HiveQuerySuite.scala | 894 ++++++++++++++++++ 1 file changed, 894 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala new file mode 100644 index 0000000000000..700a45edb11d6 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.util.{Locale, TimeZone} + +import org.scalatest.BeforeAndAfter + +import scala.util.Try + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + +import org.apache.spark.{SparkFiles, SparkException} +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{SQLConf, Row, SchemaRDD} + +case class TestData(a: Int, b: String) + +/** + * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. + */ +class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + override def beforeAll() { + TestHive.cacheTables = true + // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + // Add Locale setting + Locale.setDefault(Locale.US) + } + + override def afterAll() { + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + } + + test("SPARK-4908: concurent hive native commands") { + (1 to 100).par.map { _ => + sql("USE default") + sql("SHOW TABLES") + } + } + + createQueryTest("! operator", + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + |WHERE !(a>1) + """.stripMargin) + + createQueryTest("constant object inspector for generic udf", + """SELECT named_struct( + lower("AA"), "10", + repeat(lower("AA"), 3), "11", + lower(repeat("AA", 3)), "12", + printf("Bb%d", 12), "13", + repeat(printf("s%d", 14), 2), "14") FROM src LIMIT 1""") + + createQueryTest("NaN to Decimal", + "SELECT CAST(CAST('NaN' AS DOUBLE) AS DECIMAL(1,1)) FROM src LIMIT 1") + + createQueryTest("constant null testing", + """SELECT + |IF(FALSE, CAST(NULL AS STRING), CAST(1 AS STRING)) AS COL1, + |IF(TRUE, CAST(NULL AS STRING), CAST(1 AS STRING)) AS COL2, + |IF(FALSE, CAST(NULL AS INT), CAST(1 AS INT)) AS COL3, + |IF(TRUE, CAST(NULL AS INT), CAST(1 AS INT)) AS COL4, + |IF(FALSE, CAST(NULL AS DOUBLE), CAST(1 AS DOUBLE)) AS COL5, + |IF(TRUE, CAST(NULL AS DOUBLE), CAST(1 AS DOUBLE)) AS COL6, + |IF(FALSE, CAST(NULL AS BOOLEAN), CAST(1 AS BOOLEAN)) AS COL7, + |IF(TRUE, CAST(NULL AS BOOLEAN), CAST(1 AS BOOLEAN)) AS COL8, + |IF(FALSE, CAST(NULL AS BIGINT), CAST(1 AS BIGINT)) AS COL9, + |IF(TRUE, CAST(NULL AS BIGINT), CAST(1 AS BIGINT)) AS COL10, + |IF(FALSE, CAST(NULL AS FLOAT), CAST(1 AS FLOAT)) AS COL11, + |IF(TRUE, CAST(NULL AS FLOAT), CAST(1 AS FLOAT)) AS COL12, + |IF(FALSE, CAST(NULL AS SMALLINT), CAST(1 AS SMALLINT)) AS COL13, + |IF(TRUE, CAST(NULL AS SMALLINT), CAST(1 AS SMALLINT)) AS COL14, + |IF(FALSE, CAST(NULL AS TINYINT), CAST(1 AS TINYINT)) AS COL15, + |IF(TRUE, CAST(NULL AS TINYINT), CAST(1 AS TINYINT)) AS COL16, + |IF(FALSE, CAST(NULL AS BINARY), CAST("1" AS BINARY)) AS COL17, + |IF(TRUE, CAST(NULL AS BINARY), CAST("1" AS BINARY)) AS COL18, + |IF(FALSE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL19, + |IF(TRUE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL20, + |IF(FALSE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL21, + |IF(TRUE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL22, + |IF(FALSE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL23, + |IF(TRUE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL24 + |FROM src LIMIT 1""".stripMargin) + + createQueryTest("constant array", + """ + |SELECT sort_array( + | sort_array( + | array("hadoop distributed file system", + | "enterprise databases", "hadoop map-reduce"))) + |FROM src LIMIT 1; + """.stripMargin) + + createQueryTest("count distinct 0 values", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 'a' AS a FROM src LIMIT 0) table + """.stripMargin) + + createQueryTest("count distinct 1 value strings", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 'a' AS a FROM src LIMIT 1 UNION ALL + | SELECT 'b' AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 1 value", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 1 AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 2 values", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 2 AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 2 values including null", + """ + |SELECT COUNT(DISTINCT a, 1) FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT null AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 1 value + null", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT 1 AS a FROM src LIMIT 1 UNION ALL + | SELECT null AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 1 value long", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1L AS a FROM src LIMIT 1 UNION ALL + | SELECT 1L AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 2 values long", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1L AS a FROM src LIMIT 1 UNION ALL + | SELECT 2L AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("count distinct 1 value + null long", + """ + |SELECT COUNT(DISTINCT a) FROM ( + | SELECT 1L AS a FROM src LIMIT 1 UNION ALL + | SELECT 1L AS a FROM src LIMIT 1 UNION ALL + | SELECT null AS a FROM src LIMIT 1) table + """.stripMargin) + + createQueryTest("null case", + "SELECT case when(true) then 1 else null end FROM src LIMIT 1") + + createQueryTest("single case", + """SELECT case when true then 1 else 2 end FROM src LIMIT 1""") + + createQueryTest("double case", + """SELECT case when 1 = 2 then 1 when 2 = 2 then 3 else 2 end FROM src LIMIT 1""") + + createQueryTest("case else null", + """SELECT case when 1 = 2 then 1 when 2 = 2 then 3 else null end FROM src LIMIT 1""") + + createQueryTest("having no references", + "SELECT key FROM src GROUP BY key HAVING COUNT(*) > 1") + + createQueryTest("boolean = number", + """ + |SELECT + | 1 = true, 1L = true, 1Y = true, true = 1, true = 1L, true = 1Y, + | 0 = true, 0L = true, 0Y = true, true = 0, true = 0L, true = 0Y, + | 1 = false, 1L = false, 1Y = false, false = 1, false = 1L, false = 1Y, + | 0 = false, 0L = false, 0Y = false, false = 0, false = 0L, false = 0Y, + | 2 = true, 2L = true, 2Y = true, true = 2, true = 2L, true = 2Y, + | 2 = false, 2L = false, 2Y = false, false = 2, false = 2L, false = 2Y + |FROM src LIMIT 1 + """.stripMargin) + + test("CREATE TABLE AS runs once") { + sql("CREATE TABLE foo AS SELECT 1 FROM src LIMIT 1").collect() + assert(sql("SELECT COUNT(*) FROM foo").collect().head.getLong(0) === 1, + "Incorrect number of rows in created table") + } + + createQueryTest("between", + "SELECT * FROM src WHERE key Between 1 and 2") + + createQueryTest("div", + "SELECT 1 DIV 2, 1 div 2, 1 dIv 2, 100 DIV 51, 100 DIV 49 FROM src LIMIT 1") + + // Jdk version leads to different query output for double, so not use createQueryTest here + test("division") { + val res = sql("SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1").collect().head + Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res).foreach( x => + assert(x._1 == x._2.asInstanceOf[Double])) + } + + createQueryTest("modulus", + "SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \"true\", \"false\"), (101 / 2) % 10 FROM src LIMIT 1") + + test("Query expressed in SQL") { + setConf("spark.sql.dialect", "sql") + assert(sql("SELECT 1").collect() === Array(Seq(1))) + setConf("spark.sql.dialect", "hiveql") + } + + test("Query expressed in HiveQL") { + sql("FROM src SELECT key").collect() + } + + test("Query with constant folding the CAST") { + sql("SELECT CAST(CAST('123' AS binary) AS binary) FROM src LIMIT 1").collect() + } + + createQueryTest("Constant Folding Optimization for AVG_SUM_COUNT", + "SELECT AVG(0), SUM(0), COUNT(null), COUNT(value) FROM src GROUP BY key") + + createQueryTest("Cast Timestamp to Timestamp in UDF", + """ + | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp)) + | FROM src LIMIT 1 + """.stripMargin) + + createQueryTest("Simple Average", + "SELECT AVG(key) FROM src") + + createQueryTest("Simple Average + 1", + "SELECT AVG(key) + 1.0 FROM src") + + createQueryTest("Simple Average + 1 with group", + "SELECT AVG(key) + 1.0, value FROM src group by value") + + createQueryTest("string literal", + "SELECT 'test' FROM src") + + createQueryTest("Escape sequences", + """SELECT key, '\\\t\\' FROM src WHERE key = 86""") + + createQueryTest("IgnoreExplain", + """EXPLAIN SELECT key FROM src""") + + createQueryTest("trivial join where clause", + "SELECT * FROM src a JOIN src b WHERE a.key = b.key") + + createQueryTest("trivial join ON clause", + "SELECT * FROM src a JOIN src b ON a.key = b.key") + + createQueryTest("small.cartesian", + "SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN (SELECT key FROM src WHERE key = 2) b") + + createQueryTest("length.udf", + "SELECT length(\"test\") FROM src LIMIT 1") + + createQueryTest("partitioned table scan", + "SELECT ds, hr, key, value FROM srcpart") + + createQueryTest("hash", + "SELECT hash('test') FROM src LIMIT 1") + + createQueryTest("create table as", + """ + |CREATE TABLE createdtable AS SELECT * FROM src; + |SELECT * FROM createdtable + """.stripMargin) + + createQueryTest("create table as with db name", + """ + |CREATE DATABASE IF NOT EXISTS testdb; + |CREATE TABLE testdb.createdtable AS SELECT * FROM default.src; + |SELECT * FROM testdb.createdtable; + |DROP DATABASE IF EXISTS testdb CASCADE + """.stripMargin) + + createQueryTest("insert table with db name", + """ + |CREATE DATABASE IF NOT EXISTS testdb; + |CREATE TABLE testdb.createdtable like default.src; + |INSERT INTO TABLE testdb.createdtable SELECT * FROM default.src; + |SELECT * FROM testdb.createdtable; + |DROP DATABASE IF EXISTS testdb CASCADE + """.stripMargin) + + createQueryTest("insert into and insert overwrite", + """ + |CREATE TABLE createdtable like src; + |INSERT INTO TABLE createdtable SELECT * FROM src; + |INSERT INTO TABLE createdtable SELECT * FROM src1; + |SELECT * FROM createdtable; + |INSERT OVERWRITE TABLE createdtable SELECT * FROM src WHERE key = 86; + |SELECT * FROM createdtable; + """.stripMargin) + + createQueryTest("transform", + "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src") + + createQueryTest("LIKE", + "SELECT * FROM src WHERE value LIKE '%1%'") + + createQueryTest("DISTINCT", + "SELECT DISTINCT key, value FROM src") + + createQueryTest("empty aggregate input", + "SELECT SUM(key) FROM (SELECT * FROM src LIMIT 0) a") + + createQueryTest("lateral view1", + "SELECT tbl.* FROM src LATERAL VIEW explode(array(1,2)) tbl as a") + + createQueryTest("lateral view2", + "SELECT * FROM src LATERAL VIEW explode(array(1,2)) tbl") + + + createQueryTest("lateral view3", + "FROM src SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX") + + createQueryTest("lateral view4", + """ + |create table src_lv1 (key string, value string); + |create table src_lv2 (key string, value string); + | + |FROM src + |insert overwrite table src_lv1 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX + |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX + """.stripMargin) + + createQueryTest("lateral view5", + "FROM src SELECT explode(array(key+3, key+4))") + + createQueryTest("lateral view6", + "SELECT * FROM src LATERAL VIEW explode(map(key+3,key+4)) D as k, v") + + test("sampling") { + sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s") + } + + test("SchemaRDD toString") { + sql("SHOW TABLES").toString + sql("SELECT * FROM src").toString + } + + createQueryTest("case statements with key #1", + "SELECT (CASE 1 WHEN 2 THEN 3 END) FROM src where key < 15") + + createQueryTest("case statements with key #2", + "SELECT (CASE key WHEN 2 THEN 3 ELSE 0 END) FROM src WHERE key < 15") + + createQueryTest("case statements with key #3", + "SELECT (CASE key WHEN 2 THEN 3 WHEN NULL THEN 4 END) FROM src WHERE key < 15") + + createQueryTest("case statements with key #4", + "SELECT (CASE key WHEN 2 THEN 3 WHEN NULL THEN 4 ELSE 0 END) FROM src WHERE key < 15") + + createQueryTest("case statements WITHOUT key #1", + "SELECT (CASE WHEN key > 2 THEN 3 END) FROM src WHERE key < 15") + + createQueryTest("case statements WITHOUT key #2", + "SELECT (CASE WHEN key > 2 THEN 3 ELSE 4 END) FROM src WHERE key < 15") + + createQueryTest("case statements WITHOUT key #3", + "SELECT (CASE WHEN key > 2 THEN 3 WHEN 2 > key THEN 2 END) FROM src WHERE key < 15") + + createQueryTest("case statements WITHOUT key #4", + "SELECT (CASE WHEN key > 2 THEN 3 WHEN 2 > key THEN 2 ELSE 0 END) FROM src WHERE key < 15") + + // Jdk version leads to different query output for double, so not use createQueryTest here + test("timestamp cast #1") { + val res = sql("SELECT CAST(CAST(1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1").collect().head + assert(0.001 == res.getDouble(0)) + } + + createQueryTest("timestamp cast #2", + "SELECT CAST(CAST(1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") + + createQueryTest("timestamp cast #3", + "SELECT CAST(CAST(1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1") + + createQueryTest("timestamp cast #4", + "SELECT CAST(CAST(1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") + + createQueryTest("timestamp cast #5", + "SELECT CAST(CAST(-1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") + + createQueryTest("timestamp cast #6", + "SELECT CAST(CAST(-1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") + + createQueryTest("timestamp cast #7", + "SELECT CAST(CAST(-1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1") + + createQueryTest("timestamp cast #8", + "SELECT CAST(CAST(-1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") + + createQueryTest("select null from table", + "SELECT null FROM src LIMIT 1") + + test("predicates contains an empty AttributeSet() references") { + sql( + """ + |SELECT a FROM ( + | SELECT 1 AS a FROM src LIMIT 1 ) table + |WHERE abs(20141202) is not null + """.stripMargin).collect() + } + + test("implement identity function using case statement") { + val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src") + .map { case Row(i: Int) => i } + .collect() + .toSet + + val expected = sql("SELECT key FROM src") + .map { case Row(i: Int) => i } + .collect() + .toSet + + assert(actual === expected) + } + + // TODO: adopt this test when Spark SQL has the functionality / framework to report errors. + // See https://github.com/apache/spark/pull/1055#issuecomment-45820167 for a discussion. + ignore("non-boolean conditions in a CaseWhen are illegal") { + intercept[Exception] { + sql("SELECT (CASE WHEN key > 2 THEN 3 WHEN 1 THEN 2 ELSE 0 END) FROM src").collect() + } + } + + createQueryTest("case sensitivity when query Hive table", + "SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15") + + test("case sensitivity: registered table") { + val testData = + TestHive.sparkContext.parallelize( + TestData(1, "str1") :: + TestData(2, "str2") :: Nil) + testData.registerTempTable("REGisteredTABle") + + assertResult(Array(Array(2, "str2"))) { + sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " + + "WHERE TableAliaS.a > 1").collect() + } + } + + def isExplanation(result: SchemaRDD) = { + val explanation = result.select('plan).collect().map { case Row(plan: String) => plan } + explanation.contains("== Physical Plan ==") + } + + test("SPARK-1704: Explain commands as a SchemaRDD") { + sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + + val rdd = sql("explain select key, count(value) from src group by key") + assert(isExplanation(rdd)) + + TestHive.reset() + } + + test("SPARK-2180: HAVING support in GROUP BY clauses (positive)") { + val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3)) + .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)} + TestHive.sparkContext.parallelize(fixture).registerTempTable("having_test") + val results = + sql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3") + .collect() + .map(x => Pair(x.getString(0), x.getInt(1))) + + assert(results === Array(Pair("foo", 4))) + TestHive.reset() + } + + test("SPARK-2180: HAVING with non-boolean clause raises no exceptions") { + sql("select key, count(*) c from src group by key having c").collect() + } + + test("SPARK-2225: turn HAVING without GROUP BY into a simple filter") { + assert(sql("select key from src having key > 490").collect().size < 100) + } + + test("Query Hive native command execution result") { + val tableName = "test_native_commands" + + assertResult(0) { + sql(s"DROP TABLE IF EXISTS $tableName").count() + } + + assertResult(0) { + sql(s"CREATE TABLE $tableName(key INT, value STRING)").count() + } + + assert( + sql("SHOW TABLES") + .select('result) + .collect() + .map(_.getString(0)) + .contains(tableName)) + + assert(isExplanation(sql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key"))) + + TestHive.reset() + } + + test("Exactly once semantics for DDL and command statements") { + val tableName = "test_exactly_once" + val q0 = sql(s"CREATE TABLE $tableName(key INT, value STRING)") + + // If the table was not created, the following assertion would fail + assert(Try(table(tableName)).isSuccess) + + // If the CREATE TABLE command got executed again, the following assertion would fail + assert(Try(q0.count()).isSuccess) + } + + test("DESCRIBE commands") { + sql(s"CREATE TABLE test_describe_commands1 (key INT, value STRING) PARTITIONED BY (dt STRING)") + + sql( + """FROM src INSERT OVERWRITE TABLE test_describe_commands1 PARTITION (dt='2008-06-08') + |SELECT key, value + """.stripMargin) + + // Describe a table + assertResult( + Array( + Array("key", "int", null), + Array("value", "string", null), + Array("dt", "string", null), + Array("# Partition Information", "", ""), + Array("# col_name", "data_type", "comment"), + Array("dt", "string", null)) + ) { + sql("DESCRIBE test_describe_commands1") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a table with a fully qualified table name + assertResult( + Array( + Array("key", "int", null), + Array("value", "string", null), + Array("dt", "string", null), + Array("# Partition Information", "", ""), + Array("# col_name", "data_type", "comment"), + Array("dt", "string", null)) + ) { + sql("DESCRIBE default.test_describe_commands1") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a column is a native command + assertResult(Array(Array("value", "string", "from deserializer"))) { + sql("DESCRIBE test_describe_commands1 value") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + } + + // Describe a column is a native command + assertResult(Array(Array("value", "string", "from deserializer"))) { + sql("DESCRIBE default.test_describe_commands1 value") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + } + + // Describe a partition is a native command + assertResult( + Array( + Array("key", "int"), + Array("value", "string"), + Array("dt", "string"), + Array(""), + Array("# Partition Information"), + Array("# col_name", "data_type", "comment"), + Array(""), + Array("dt", "string")) + ) { + sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") + .select('result) + .collect() + .map(_.getString(0).replaceAll("None", "").trim.split("\t").map(_.trim)) + } + + // Describe a registered temporary table. + val testData = + TestHive.sparkContext.parallelize( + TestData(1, "str1") :: + TestData(1, "str2") :: Nil) + testData.registerTempTable("test_describe_commands2") + + assertResult( + Array( + Array("# Registered as a temporary table", null, null), + Array("a", "IntegerType", null), + Array("b", "StringType", null)) + ) { + sql("DESCRIBE test_describe_commands2") + .select('col_name, 'data_type, 'comment) + .collect() + } + } + + test("SPARK-2263: Insert Map values") { + sql("CREATE TABLE m(value MAP)") + sql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10") + sql("SELECT * FROM m").collect().zip(sql("SELECT * FROM src LIMIT 10").collect()).map { + case (Row(map: Map[_, _]), Row(key: Int, value: String)) => + assert(map.size === 1) + assert(map.head === (key, value)) + } + } + + test("ADD JAR command") { + val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath + sql("CREATE TABLE alter1(a INT, b INT)") + intercept[Exception] { + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + } + // Now only verify 0.12.0, and ignore other versions due to binary compatibility + // current TestSerDe.jar is from 0.12.0 + if (HiveShim.version == "0.12.0") { + sql(s"ADD JAR $testJar") + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + } + sql("DROP TABLE alter1") + } + + test("ADD FILE command") { + val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile + sql(s"ADD FILE $testFile") + + val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ => + Iterator.single(new File(SparkFiles.get("v1.txt")).canRead) + } + + assert(checkAddFileRDD.first()) + } + + case class LogEntry(filename: String, message: String) + case class LogFile(name: String) + + createQueryTest("dynamic_partition", + """ + |DROP TABLE IF EXISTS dynamic_part_table; + |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT); + | + |SET hive.exec.dynamic.partition.mode=nonstrict; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, NULL FROM src WHERE key=150; + | + |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, NULL FROM src WHERE key=150; + | + |DROP TABLE IF EXISTS dynamic_part_table; + """.stripMargin) + + test("Dynamic partition folder layout") { + sql("DROP TABLE IF EXISTS dynamic_part_table") + sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)") + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + val data = Map( + Seq("1", "1") -> 1, + Seq("1", "NULL") -> 2, + Seq("NULL", "1") -> 3, + Seq("NULL", "NULL") -> 4) + + data.foreach { case (parts, value) => + sql( + s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150 + """.stripMargin) + + val partFolder = Seq("partcol1", "partcol2") + .zip(parts) + .map { case (k, v) => + if (v == "NULL") { + s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}" + } else { + s"$k=$v" + } + } + .mkString("/") + + // Loads partition data to a temporary table to verify contents + val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000" + + sql("DROP TABLE IF EXISTS dp_verify") + sql("CREATE TABLE dp_verify(intcol INT)") + sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify") + + assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value))) + } + } + + test("Partition spec validation") { + sql("DROP TABLE IF EXISTS dp_test") + sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") + sql("SET hive.exec.dynamic.partition.mode=strict") + + // Should throw when using strict dynamic partition mode without any static partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + // Should throw when a static partition appears after a dynamic partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + } + + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { + sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") + sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") + + sql( + """ + SELECT name, message + FROM rawLogs + JOIN ( + SELECT name + FROM logFiles + ) files + ON rawLogs.filename = files.name + """).registerTempTable("boom") + + // This should be successfully analyzed + sql("SELECT * FROM boom").queryExecution.analyzed + } + + test("SPARK-3810: PreInsertionCasts static partitioning support") { + val analyzedPlan = { + loadTestTable("srcpart") + sql("DROP TABLE IF EXISTS withparts") + sql("CREATE TABLE withparts LIKE srcpart") + sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src") + .queryExecution.analyzed + } + + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + analyzedPlan.collect { + case _: Project => () + }.size + } + } + + test("SPARK-3810: PreInsertionCasts dynamic partitioning support") { + val analyzedPlan = { + loadTestTable("srcpart") + sql("DROP TABLE IF EXISTS withparts") + sql("CREATE TABLE withparts LIKE srcpart") + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart") + sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src") + .queryExecution.analyzed + } + + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { + analyzedPlan.collect { + case _: Project => () + }.size + } + } + + test("parse HQL set commands") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + val testVal = "val0,val_1,val2.3,my_table" + + sql(s"set $testKey=$testVal") + assert(getConf(testKey, testVal + "_") == testVal) + + sql("set some.property=20") + assert(getConf("some.property", "0") == "20") + sql("set some.property = 40") + assert(getConf("some.property", "0") == "40") + + sql(s"set $testKey=$testVal") + assert(getConf(testKey, "0") == testVal) + + sql(s"set $testKey=") + assert(getConf(testKey, "0") == "") + } + + test("SET commands semantics for a HiveContext") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + val testVal = "test.val.0" + val nonexistentKey = "nonexistent" + val KV = "([^=]+)=([^=]*)".r + def collectResults(rdd: SchemaRDD): Set[(String, String)] = + rdd.collect().map { + case Row(key: String, value: String) => key -> value + case Row(KV(key, value)) => key -> value + }.toSet + clear() + + // "SET" itself returns all config variables currently specified in SQLConf. + // TODO: Should we be listing the default here always? probably... + assert(sql("SET").collect().size == 0) + + assertResult(Set(testKey -> testVal)) { + collectResults(sql(s"SET $testKey=$testVal")) + } + + assert(hiveconf.get(testKey, "") == testVal) + assertResult(Set(testKey -> testVal))(collectResults(sql("SET"))) + assertResult(Set(testKey -> testVal))(collectResults(sql("SET -v"))) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) + assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(sql("SET")) + } + assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { + collectResults(sql("SET -v")) + } + + // "SET key" + assertResult(Set(testKey -> testVal)) { + collectResults(sql(s"SET $testKey")) + } + + assertResult(Set(nonexistentKey -> "")) { + collectResults(sql(s"SET $nonexistentKey")) + } + + clear() + } + + createQueryTest("select from thrift based table", + "SELECT * from src_thrift") + + // Put tests that depend on specific Hive settings before these last two test, + // since they modify /clear stuff. +} + +// for SPARK-2180 test +case class HavingRow(key: Int, value: String, attr: Int) From ae7c1396ad8cd64321ed795e69e67fe574815987 Mon Sep 17 00:00:00 2001 From: yantangzhai Date: Sat, 24 Jan 2015 16:03:23 +0800 Subject: [PATCH 17/17] update --- .../sql/hive/execution/HiveQuerySuite.scala | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 700a45edb11d6..df72be7746ac6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -226,7 +226,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Jdk version leads to different query output for double, so not use createQueryTest here test("division") { val res = sql("SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1").collect().head - Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res).foreach( x => + Seq(2.0, 0.5, 0.3333333333333333, 0.002).zip(res.toSeq).foreach( x => assert(x._1 == x._2.asInstanceOf[Double])) } @@ -235,7 +235,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("Query expressed in SQL") { setConf("spark.sql.dialect", "sql") - assert(sql("SELECT 1").collect() === Array(Seq(1))) + assert(sql("SELECT 1").collect() === Array(Row(1))) setConf("spark.sql.dialect", "hiveql") } @@ -467,7 +467,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { TestData(2, "str2") :: Nil) testData.registerTempTable("REGisteredTABle") - assertResult(Array(Array(2, "str2"))) { + assertResult(Array(Row(2, "str2"))) { sql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " + "WHERE TableAliaS.a > 1").collect() } @@ -553,12 +553,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Describe a table assertResult( Array( - Array("key", "int", null), - Array("value", "string", null), - Array("dt", "string", null), - Array("# Partition Information", "", ""), - Array("# col_name", "data_type", "comment"), - Array("dt", "string", null)) + Row("key", "int", null), + Row("value", "string", null), + Row("dt", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("dt", "string", null)) ) { sql("DESCRIBE test_describe_commands1") .select('col_name, 'data_type, 'comment) @@ -568,12 +568,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { // Describe a table with a fully qualified table name assertResult( Array( - Array("key", "int", null), - Array("value", "string", null), - Array("dt", "string", null), - Array("# Partition Information", "", ""), - Array("# col_name", "data_type", "comment"), - Array("dt", "string", null)) + Row("key", "int", null), + Row("value", "string", null), + Row("dt", "string", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("dt", "string", null)) ) { sql("DESCRIBE default.test_describe_commands1") .select('col_name, 'data_type, 'comment) @@ -623,9 +623,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assertResult( Array( - Array("# Registered as a temporary table", null, null), - Array("a", "IntegerType", null), - Array("b", "StringType", null)) + Row("a", "IntegerType", null), + Row("b", "StringType", null)) ) { sql("DESCRIBE test_describe_commands2") .select('col_name, 'data_type, 'comment) @@ -848,7 +847,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { case Row(key: String, value: String) => key -> value case Row(KV(key, value)) => key -> value }.toSet - clear() + conf.clear() // "SET" itself returns all config variables currently specified in SQLConf. // TODO: Should we be listing the default here always? probably... @@ -880,7 +879,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { collectResults(sql(s"SET $nonexistentKey")) } - clear() + conf.clear() } createQueryTest("select from thrift based table",