From 1088c30c0c965ee9f32170658c7b24c4c3b836af Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 6 Mar 2016 23:42:00 -0800 Subject: [PATCH 1/6] [SPARK-12719] SQL generation support for generators (including UDTF) --- .../sql/catalyst/analysis/unresolved.scala | 4 + .../apache/spark/sql/hive/SQLBuilder.scala | 56 +++++++++- .../sql/hive/LogicalPlanToSQLSuite.scala | 100 ++++++++++++++++++ 3 files changed, 157 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 01afa01ae95c5..5d4e5a528faf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -253,6 +253,10 @@ case class MultiAlias(child: Expression, names: Seq[String]) override def toString: String = s"$child AS $names" + override def sql: String = { + val aliasNames = names.map(quoteIdentifier(_)).mkString(",") + s"${child.sql} AS ($aliasNames)" + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 760335bba5d4a..9c44ea71b06be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.MultiAlias import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.logical._ @@ -94,6 +95,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Distinct(p: Project) => projectToSQL(p, isDistinct = true) + case g : Generate => + generateToSQL(g) + case p: Project => projectToSQL(p, isDistinct = false) @@ -208,15 +212,36 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi segments.map(_.trim).filter(_.nonEmpty).mkString(" ") private def projectToSQL(plan: Project, isDistinct: Boolean): String = { + val (projectListExprs, planToProcess) = getProjectListExprs(plan) build( "SELECT", if (isDistinct) "DISTINCT" else "", - plan.projectList.map(_.sql).mkString(", "), - if (plan.child == OneRowRelation) "" else "FROM", - toSQL(plan.child) + projectListExprs.map(_.sql).mkString(", "), + if (planToProcess == OneRowRelation) "" else "FROM", + toSQL(planToProcess) ) } + private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = { + plan match { + case p @ Project(_, g: Generate) if g.qualifier.isEmpty => + // Only keep the first generated column in the list so that we can + // transform it to a Generator expression in the following step. + val projList = p.projectList.filter { + case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false + case _ => true + } + val exprs = projList.map { + case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) => + val names = g.generatorOutput.map(_.name) + MultiAlias(g.generator, names) + case other => other + } + (exprs, g.child) + case _ => (plan.projectList, plan.child) + } + } + private def aggregateToSQL(plan: Aggregate): String = { val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ") build( @@ -305,6 +330,30 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi (w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "), if (w.child == OneRowRelation) "" else "FROM", toSQL(w.child) + } + + /* This function handles the SQL generation when generators are specified in the + * LATERAL VIEW clause. SQL generation of generators specified in projection lists + * are handled in projectToSQL. + * sample plan : + * +- Project [mycol2#192] + * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] + * +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191] + * +- MetastoreRelation default, src, None + * + */ + private def generateToSQL(plan: Generate): String = { + val columnAliases = plan.generatorOutput.map(a => quoteIdentifier(a.name)).mkString(",") + val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get + val outerClause = if (plan.outer) "OUTER" else "" + build( + toSQL(plan.child), + "LATERAL VIEW ", + outerClause, + plan.generator.sql, + quoteIdentifier(generatorAlias), + "AS", + columnAliases ) } @@ -360,6 +409,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case plan @ Project(_, _: SubqueryAlias + | _: Generate | _: Filter | _: Join | _: MetastoreRelation diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 198652b355fe2..649ed7a4d478d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -30,6 +30,17 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t3") + sql("DROP TABLE IF EXISTS t4") + + val tuples: Seq[(String, String)] = + ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: + ("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: + ("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") :: + ("4", null) :: + ("5", """{"f1": "", "f5": null}""") :: + ("6", "[invalid JSON string]") :: + Nil sqlContext.range(10).write.saveAsTable("parquet_t0") sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0") @@ -45,6 +56,11 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write .saveAsTable("parquet_t2") + + sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2") + + tuples.toDF("key", "jstring").write.saveAsTable("t3") + sql("CREATE TABLE t4 as select key, array(value) as value from parquet_t1 limit 20") } override protected def afterAll(): Unit = { @@ -52,6 +68,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t3") + sql("DROP TABLE IF EXISTS t4") } private def checkHiveQl(hiveQl: String): Unit = { @@ -550,4 +568,86 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |WINDOW w AS (PARTITION BY key % 5 ORDER BY key) """.stripMargin) } + + test("SQL generation for generate") { + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) + + // Basic Explode + checkHiveQl("SELECT explode(array(1,2,3)) FROM src") + + // Explode with Alias + checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src") + + // Explode without FROM + checkHiveQl("select explode(array(1,2,3)) AS gencol") + + // Explode with columns other than generated columns in projection list + checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t4") + + // Explode with a column reference as input to generator + checkHiveQl("SELECT key, value from t4 LATERAL VIEW explode(value) gentab") + + // json_tuple + checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM t3") + + // udtf + checkHiveQl("SELECT key, gencol FROM t4 LATERAL VIEW udtf_count2(value) gentab AS gencol") + + // udtf + checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t4 LIMIT 3) g1") + + // Filter and OUTER clause + checkHiveQl( + """SELECT key, value + |FROM t4 LATERAL VIEW OUTER explode(value) gentab as gencol + |WHERE key = 1 + """.stripMargin + ) + + // single lateral view + checkHiveQl( + """SELECT * + |FROM t4 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol + |SORT BY key ASC, gencol ASC LIMIT 1 + """.stripMargin + ) + + // multiple lateral views + checkHiveQl( + """SELECT gentab2.* + |FROM t4 + |LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1 + |LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3 + """.stripMargin + ) + + // Subquries in FROM clause using Generate + checkHiveQl( + """SELECT subq.gencol + |FROM + |(SELECT * from t4 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin) + + checkHiveQl( + """SELECT subq.key + |FROM + |(SELECT key, value from t4 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin + ) + + checkHiveQl( + """SELECT gentab.* + |FROM + |t4 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + """.stripMargin + ) + sql("DROP TEMPORARY FUNCTION udtf_count2") + } } From 153b876767b2001e5847ee306d686db5ebfa48e6 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 7 Mar 2016 13:28:48 -0800 Subject: [PATCH 2/6] Code review comments --- .../apache/spark/sql/hive/SQLBuilder.scala | 4 +-- .../sql/hive/LogicalPlanToSQLSuite.scala | 36 +++++++++---------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 9c44ea71b06be..915f804b3caff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -334,7 +334,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi /* This function handles the SQL generation when generators are specified in the * LATERAL VIEW clause. SQL generation of generators specified in projection lists - * are handled in projectToSQL. + * is handled in projectToSQL. * sample plan : * +- Project [mycol2#192] * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] @@ -348,7 +348,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi val outerClause = if (plan.outer) "OUTER" else "" build( toSQL(plan.child), - "LATERAL VIEW ", + "LATERAL VIEW", outerClause, plan.generator.sql, quoteIdentifier(generatorAlias), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 649ed7a4d478d..486bc8cf30297 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -29,9 +29,9 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") - sql("DROP TABLE IF EXISTS t3") - sql("DROP TABLE IF EXISTS t4") + sql("DROP TABLE IF EXISTS t1") val tuples: Seq[(String, String)] = ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: @@ -57,19 +57,17 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { .write .saveAsTable("parquet_t2") - sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2") - - tuples.toDF("key", "jstring").write.saveAsTable("t3") - sql("CREATE TABLE t4 as select key, array(value) as value from parquet_t1 limit 20") + tuples.toDF("key", "jstring").write.saveAsTable("parquet_t3") + sql("CREATE TABLE t1 as select key, array(value) as value from parquet_t1 limit 20") } override protected def afterAll(): Unit = { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") - sql("DROP TABLE IF EXISTS t3") - sql("DROP TABLE IF EXISTS t4") + sql("DROP TABLE IF EXISTS t1") } private def checkHiveQl(hiveQl: String): Unit = { @@ -589,24 +587,24 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkHiveQl("select explode(array(1,2,3)) AS gencol") // Explode with columns other than generated columns in projection list - checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t4") + checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1") // Explode with a column reference as input to generator - checkHiveQl("SELECT key, value from t4 LATERAL VIEW explode(value) gentab") + checkHiveQl("SELECT key, value from t1 LATERAL VIEW explode(value) gentab") // json_tuple - checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM t3") + checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3") // udtf - checkHiveQl("SELECT key, gencol FROM t4 LATERAL VIEW udtf_count2(value) gentab AS gencol") + checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol") // udtf - checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t4 LIMIT 3) g1") + checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1") // Filter and OUTER clause checkHiveQl( """SELECT key, value - |FROM t4 LATERAL VIEW OUTER explode(value) gentab as gencol + |FROM t1 LATERAL VIEW OUTER explode(value) gentab as gencol |WHERE key = 1 """.stripMargin ) @@ -614,7 +612,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // single lateral view checkHiveQl( """SELECT * - |FROM t4 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol + |FROM t1 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol |SORT BY key ASC, gencol ASC LIMIT 1 """.stripMargin ) @@ -622,7 +620,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // multiple lateral views checkHiveQl( """SELECT gentab2.* - |FROM t4 + |FROM t1 |LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1 |LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3 """.stripMargin @@ -632,20 +630,20 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkHiveQl( """SELECT subq.gencol |FROM - |(SELECT * from t4 LATERAL VIEW explode(value) gentab AS gencol) subq + |(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq """.stripMargin) checkHiveQl( """SELECT subq.key |FROM - |(SELECT key, value from t4 LATERAL VIEW explode(value) gentab AS gencol) subq + |(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq """.stripMargin ) checkHiveQl( """SELECT gentab.* |FROM - |t4 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + |t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 """.stripMargin ) sql("DROP TEMPORARY FUNCTION udtf_count2") From 37d17cf02f8f7db6d21c0f54ef0421116bd1b9ca Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 7 Mar 2016 18:36:21 -0800 Subject: [PATCH 3/6] Review comments from Reynold --- .../sql/hive/LogicalPlanToSQLSuite.scala | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 486bc8cf30297..566f258e557fd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -567,16 +567,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { """.stripMargin) } - test("SQL generation for generate") { - sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") - // The function source code can be found at: - // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF - sql( - """ - |CREATE TEMPORARY FUNCTION udtf_count2 - |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - """.stripMargin) - + test("SQL generator for explode in projection list") { // Basic Explode checkHiveQl("SELECT explode(array(1,2,3)) FROM src") @@ -586,21 +577,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // Explode without FROM checkHiveQl("select explode(array(1,2,3)) AS gencol") - // Explode with columns other than generated columns in projection list + // non-generated columns in projection list checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1") + } - // Explode with a column reference as input to generator - checkHiveQl("SELECT key, value from t1 LATERAL VIEW explode(value) gentab") - - // json_tuple + test("SQL generation for json_tuple as generator") { checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3") + } - // udtf - checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol") - - // udtf - checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1") - + test("SQL generation for lateral views") { // Filter and OUTER clause checkHiveQl( """SELECT key, value @@ -626,6 +611,16 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { """.stripMargin ) + // No generated column aliases + checkHiveQl( + """SELECT gentab.* + |FROM + |t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + """.stripMargin + ) + } + + test("SQL generation for lateral views in subquery") { // Subquries in FROM clause using Generate checkHiveQl( """SELECT subq.gencol @@ -639,13 +634,23 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq """.stripMargin ) + } + + test("SQL generation for UDTF") { + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) + + checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol") + + checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1") - checkHiveQl( - """SELECT gentab.* - |FROM - |t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 - """.stripMargin - ) sql("DROP TEMPORARY FUNCTION udtf_count2") } } From cd0c001367aa3a39b9a26e4162b733bea225812f Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 7 Mar 2016 23:35:46 -0800 Subject: [PATCH 4/6] Alternative approach. --- .../sql/catalyst/analysis/unresolved.scala | 4 - .../apache/spark/sql/hive/SQLBuilder.scala | 74 +++++++++++-------- .../sql/hive/LogicalPlanToSQLSuite.scala | 25 ++++--- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 5d4e5a528faf5..01afa01ae95c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -253,10 +253,6 @@ case class MultiAlias(child: Expression, names: Seq[String]) override def toString: String = s"$child AS $names" - override def sql: String = { - val aliasNames = names.map(quoteIdentifier(_)).mkString(",") - s"${child.sql} AS ($aliasNames)" - } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 915f804b3caff..3d1498f420ed3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -24,7 +24,6 @@ import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.MultiAlias import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.logical._ @@ -95,7 +94,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Distinct(p: Project) => projectToSQL(p, isDistinct = true) - case g : Generate => + case p@ Project(_, g: Generate) => + generateToSQL(p) + + case g: Generate => generateToSQL(g) case p: Project => @@ -212,36 +214,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi segments.map(_.trim).filter(_.nonEmpty).mkString(" ") private def projectToSQL(plan: Project, isDistinct: Boolean): String = { - val (projectListExprs, planToProcess) = getProjectListExprs(plan) build( "SELECT", if (isDistinct) "DISTINCT" else "", - projectListExprs.map(_.sql).mkString(", "), - if (planToProcess == OneRowRelation) "" else "FROM", - toSQL(planToProcess) + plan.projectList.map(_.sql).mkString(", "), + if (plan.child == OneRowRelation) "" else "FROM", + toSQL(plan.child) ) } - private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = { - plan match { - case p @ Project(_, g: Generate) if g.qualifier.isEmpty => - // Only keep the first generated column in the list so that we can - // transform it to a Generator expression in the following step. - val projList = p.projectList.filter { - case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false - case _ => true - } - val exprs = projList.map { - case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) => - val names = g.generatorOutput.map(_.name) - MultiAlias(g.generator, names) - case other => other - } - (exprs, g.child) - case _ => (plan.projectList, plan.child) - } - } - private def aggregateToSQL(plan: Aggregate): String = { val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ") build( @@ -332,9 +313,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi toSQL(w.child) } - /* This function handles the SQL generation when generators are specified in the - * LATERAL VIEW clause. SQL generation of generators specified in projection lists - * is handled in projectToSQL. + /* This function handles the SQL generation for generators. * sample plan : * +- Project [mycol2#192] * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] @@ -347,7 +326,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get val outerClause = if (plan.outer) "OUTER" else "" build( - toSQL(plan.child), + if (plan.child == OneRowRelation) s"(SELECT 1) ${SQLBuilder.newSubqueryName}" else toSQL(plan.child), "LATERAL VIEW", outerClause, plan.generator.sql, @@ -357,6 +336,37 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + private def generateToSQL(plan: Project): String = { + // assert if child is a generate or not. + val generate = plan.child.asInstanceOf[Generate] + // Generators that appear in projection list will be expressed as LATERAL VIEW. + // A qualifier is needed for a LATERAL VIEw. + val generatorAlias: String = generate.qualifier.getOrElse(SQLBuilder.newGeneratorName) + + // Qualify the attributes in projection list. + val newProjList = plan.projectList.map { + case a if generate.generatorOutput.exists(_.semanticEquals(a)) => + a.toAttribute.withQualifiers(Seq(generatorAlias)) + case o => o + } + + //If Generate is missing the qualifier (its in projection list) , add one here. + val planToProcess = + if (generate.qualifier.isEmpty) { + generate.copy(qualifier = Some(generatorAlias)) + } + else { + generate + } + + build( + "SELECT", + newProjList.map(a => a.sql).mkString(","), + "FROM", + toSQL(planToProcess) + ) + } + object Canonicalizer extends RuleExecutor[LogicalPlan] { override protected def batches: Seq[Batch] = Seq( Batch("Collapse Project", FixedPoint(100), @@ -461,4 +471,8 @@ object SQLBuilder { private val nextSubqueryId = new AtomicLong(0) private def newSubqueryName: String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}" + + private val nextGeneratorId = new AtomicLong(0) + + private def newGeneratorName: String = s"gen_generator_${nextGeneratorId.getAndIncrement()}" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 566f258e557fd..251c746ebc636 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -588,23 +588,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { test("SQL generation for lateral views") { // Filter and OUTER clause checkHiveQl( - """SELECT key, value - |FROM t1 LATERAL VIEW OUTER explode(value) gentab as gencol + """ + |SELECT key, value + |FROM t1 + |LATERAL VIEW OUTER explode(value) gentab as gencol |WHERE key = 1 """.stripMargin ) // single lateral view checkHiveQl( - """SELECT * - |FROM t1 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol + """ + |SELECT * + |FROM t1 + |LATERAL VIEW explode(array(1,2,3)) gentab AS gencol |SORT BY key ASC, gencol ASC LIMIT 1 """.stripMargin ) // multiple lateral views checkHiveQl( - """SELECT gentab2.* + """ + |SELECT gentab2.* |FROM t1 |LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1 |LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3 @@ -614,8 +619,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // No generated column aliases checkHiveQl( """SELECT gentab.* - |FROM - |t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + |FROM t1 + |LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 """.stripMargin ) } @@ -623,13 +628,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { test("SQL generation for lateral views in subquery") { // Subquries in FROM clause using Generate checkHiveQl( - """SELECT subq.gencol + """ + |SELECT subq.gencol |FROM |(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq """.stripMargin) checkHiveQl( - """SELECT subq.key + """ + |SELECT subq.key |FROM |(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq """.stripMargin From 53e9934cd71ec8fb23f7c6db1405991f205a0aba Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 8 Mar 2016 21:50:15 -0800 Subject: [PATCH 5/6] scala style fix --- .../scala/org/apache/spark/sql/hive/SQLBuilder.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 3d1498f420ed3..e767b12c7a8de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -326,7 +326,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get val outerClause = if (plan.outer) "OUTER" else "" build( - if (plan.child == OneRowRelation) s"(SELECT 1) ${SQLBuilder.newSubqueryName}" else toSQL(plan.child), + if (plan.child == OneRowRelation) { + s"(SELECT 1) ${SQLBuilder.newSubqueryName}" + } + else { + toSQL(plan.child) + }, "LATERAL VIEW", outerClause, plan.generator.sql, @@ -350,7 +355,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case o => o } - //If Generate is missing the qualifier (its in projection list) , add one here. + // If Generate is missing the qualifier (its in projection list) , add one here. val planToProcess = if (generate.qualifier.isEmpty) { generate.copy(qualifier = Some(generatorAlias)) From bea871f6b43de7ffcb6c58089521401ca39a7145 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 10 Mar 2016 23:16:41 -0800 Subject: [PATCH 6/6] Address review comments --- .../scala/org/apache/spark/sql/hive/SQLBuilder.scala | 11 ++++++----- .../apache/spark/sql/hive/LogicalPlanToSQLSuite.scala | 9 +++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index e767b12c7a8de..71c78313e8614 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -94,7 +94,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Distinct(p: Project) => projectToSQL(p, isDistinct = true) - case p@ Project(_, g: Generate) => + case p @ Project(_, g: Generate) => generateToSQL(p) case g: Generate => @@ -311,18 +311,19 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi (w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "), if (w.child == OneRowRelation) "" else "FROM", toSQL(w.child) + ) } - /* This function handles the SQL generation for generators. + /** + * This function handles the SQL generation for generators. * sample plan : * +- Project [mycol2#192] * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] * +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191] * +- MetastoreRelation default, src, None - * */ private def generateToSQL(plan: Generate): String = { - val columnAliases = plan.generatorOutput.map(a => quoteIdentifier(a.name)).mkString(",") + val columnAliases = plan.generatorOutput.map(_.sql).mkString(",") val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get val outerClause = if (plan.outer) "OUTER" else "" build( @@ -342,8 +343,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } private def generateToSQL(plan: Project): String = { - // assert if child is a generate or not. val generate = plan.child.asInstanceOf[Generate] + assert(generate.join == true || plan.projectList.size == 1) // Generators that appear in projection list will be expressed as LATERAL VIEW. // A qualifier is needed for a LATERAL VIEw. val generatorAlias: String = generate.qualifier.getOrElse(SQLBuilder.newGeneratorName) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 251c746ebc636..8ecbe1fd33b00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -585,6 +585,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3") } + test("Lateral view with join") { + checkHiveQl( + """ + |SELECT gencol, explode(array(1,2,3)), x1.key + |FROM (t1 LATERAL VIEW OUTER explode(value) gentab as gencol), t1 as x1 + """.stripMargin + ) + } + test("SQL generation for lateral views") { // Filter and OUTER clause checkHiveQl(