Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command

private[hive] case class AddFile(filePath: String) extends Command

private[hive] case class AddJar(path: String) extends Command

private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command

private[hive] case class AnalyzeTable(tableName: String) extends Command
Expand Down Expand Up @@ -231,7 +233,7 @@ private[hive] object HiveQl {
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
CacheCommand(sql.trim.drop(14).trim, false)
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
NativeCommand(sql)
AddJar(sql.trim.drop(8).trim)
} else if (sql.trim.toLowerCase.startsWith("add file")) {
AddFile(sql.trim.drop(9))
} else if (sql.trim.toLowerCase.startsWith("dfs")) {
Expand Down Expand Up @@ -1018,9 +1020,9 @@ private[hive] object HiveQl {

/* Other functions */
case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))

/* UDFs - Must be last otherwise will preempt built in functions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ private[hive] trait HiveStrategies {

case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil

case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case hive.AddJar(path) => execution.AddJar(path) :: Nil

case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
Seq.empty[Row]
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class AddJar(path: String) extends LeafNode with Command {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw SetCommand takes the context as the parameter, as well as the other Commands in the file commands.scala, like:

case class SetCommand(
    key: Option[String], value: Option[String], output: Seq[Attribute])(
    @transient context: SQLContext)

Should we follow the same pattern? The default sqlContext will take value from a ThreadLocal variable, which probably a little different.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ThreadLocal version was added in #993, and I see this as the more recommended manner to pass SQLContext/HiveContext since it saves boilerplate code, @marmbrus is it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you should use the provide sqlContext. This will always be provided by the planner.

def hiveContext = sqlContext.asInstanceOf[HiveContext]

override def output = Seq.empty

override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.hive.execution

import java.io.File

import scala.util.Try

import org.apache.spark.sql.{SchemaRDD, Row}
import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
Expand Down Expand Up @@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest {
"SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15")

test("case sensitivity: registered table") {
val testData: SchemaRDD =
val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(2, "str2") :: Nil)
Expand Down Expand Up @@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest {
}

// Describe a registered temporary table.
val testData: SchemaRDD =
val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(1, "str2") :: Nil)
Expand Down Expand Up @@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest {
}
}

test("ADD JAR command") {
val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
sql("CREATE TABLE alter1(a INT, b INT)")
intercept[Exception] {
sql(
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
sql(s"ADD JAR $testJar")
sql(
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
|WITH serdeproperties('s1'='9')
""".stripMargin)
sql("DROP TABLE alter1")
}

test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
Expand Down