From 3a545667d66e162430070eb0361ae1a96fa59f89 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 13 Jan 2023 18:58:22 +0800 Subject: [PATCH 1/9] [SPARK-42051][SQL] Codegen Support for HiveGenericUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 61 ++++++++++++++++--- .../sql/hive/execution/HiveUDFSuite.scala | 11 ++++ 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index a950c1a17839c..df20f8adf1ec1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -35,7 +35,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, CodegenFallback, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ @@ -128,11 +129,10 @@ private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataTyp override def get(): AnyRef = wrapper(func()).asInstanceOf[AnyRef] } -private[hive] case class HiveGenericUDF( +case class HiveGenericUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression with HiveInspectors - with CodegenFallback with Logging with UserDefinedExpression { @@ -154,8 +154,9 @@ private[hive] case class HiveGenericUDF( function.initializeAndFoldConstants(argumentInspectors.toArray) } + // Visible for codegen @transient - private lazy val unwrapper = unwrapperFor(returnInspector) + lazy val unwrapper = unwrapperFor(returnInspector) @transient private lazy val isUDFDeterministic = { @@ -163,10 +164,13 @@ private[hive] case class HiveGenericUDF( udfType != null && udfType.deterministic() && !udfType.stateful() } + // Visible for codegen @transient - private lazy val deferredObjects = argumentInspectors.zip(children).map { case (inspect, child) => - new DeferredObjectAdapter(inspect, child.dataType) - }.toArray[DeferredObject] + lazy val deferredObjects = { + argumentInspectors.zip(children).map { case (inspect, child) => + new DeferredObjectAdapter(inspect, child.dataType) + }.toArray[DeferredObject] + } override lazy val dataType: DataType = inspectorToDataType(returnInspector) @@ -192,6 +196,49 @@ private[hive] case class HiveGenericUDF( override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(children = newChildren) + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val refTerm = ctx.addReferenceObj("this", this) + val childrenEvals = children.map(_.genCode(ctx)) + + val setDeferredObjects = childrenEvals.zipWithIndex.zip(children.map(_.dataType)).map { + case ((eval, i), dt) => + val funcTerm = ctx.freshName("func") + val ft = CodeGenerator.boxedType(dt) + val deferredObjectAdapterClz = classOf[DeferredObjectAdapter].getCanonicalName + s""" + |scala.Function0<$ft> $funcTerm = new scala.Function0() { + | @Override + | public $ft apply() { + | return ${eval.isNull} ? null : ${eval.value}; + | } + |}; + |(($deferredObjectAdapterClz) $refTerm.deferredObjects()[$i]).set($funcTerm); + |""".stripMargin + } + + val resultType = CodeGenerator.javaType(dataType) + ev.copy(code = + code""" + |${childrenEvals.map(_.code).mkString("\n")} + | + |${setDeferredObjects.mkString("\n")} + | + |$resultType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |boolean ${ev.isNull} = false; + |try { + | ${ev.value} = ($resultType) $refTerm.unwrapper().apply( + | $refTerm.function().evaluate($refTerm.deferredObjects())); + | ${ev.isNull} = ${ev.value} == null; + |} catch (Throwable e) { + | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( + | "${funcWrapper.functionClassName}", + | "${children.map(_.dataType.catalogString).mkString(", ")}", + | "${dataType.catalogString}", + | e); + |} + |""".stripMargin + ) + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index f494232502f75..e5cfe3cebc787 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.io.{LongWritable, Writable} import org.apache.spark.{SparkFiles, TestUtils} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -711,6 +712,16 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + + test("SPARK-42051: HiveGenericUDF Codegen Support") { + withUserDefinedFunction("CodeGenHiveGenericUDF" -> false) { + sql(s"CREATE FUNCTION CodeGenHiveGenericUDF AS '${classOf[GenericUDFMaskHash].getName}'") + val df = sql("SELECT CodeGenHiveGenericUDF('Spark SQL')") + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[WholeStageCodegenExec]) + checkAnswer(df, Seq(Row("14ab8df5135825bc9f5ff7c30609f02f"))) + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { From 362197ec1b28c9e50a618dbe2a2827fdf57a1597 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 13 Jan 2023 19:08:00 +0800 Subject: [PATCH 2/9] [SPARK-42051][SQL] Codegen Support for HiveGenericUDF --- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index df20f8adf1ec1..b5bad3344af1e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -166,11 +166,10 @@ case class HiveGenericUDF( // Visible for codegen @transient - lazy val deferredObjects = { + lazy val deferredObjects = argumentInspectors.zip(children).map { case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType) }.toArray[DeferredObject] - } override lazy val dataType: DataType = inspectorToDataType(returnInspector) From 28a3c9c811c317dd3f0fc778f27d1feadfbc0de8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 13 Jan 2023 19:08:59 +0800 Subject: [PATCH 3/9] [SPARK-42051][SQL] Codegen Support for HiveGenericUDF --- .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index b5bad3344af1e..d359a953e108a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -166,10 +166,9 @@ case class HiveGenericUDF( // Visible for codegen @transient - lazy val deferredObjects = - argumentInspectors.zip(children).map { case (inspect, child) => - new DeferredObjectAdapter(inspect, child.dataType) - }.toArray[DeferredObject] + lazy val deferredObjects = argumentInspectors.zip(children).map { case (inspect, child) => + new DeferredObjectAdapter(inspect, child.dataType) + }.toArray[DeferredObject] override lazy val dataType: DataType = inspectorToDataType(returnInspector) From 257d484e697f007398215efaa380e5b2693bf23b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Jan 2023 11:05:39 +0800 Subject: [PATCH 4/9] fix 'Cannot access non-final local variable from inner class' --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index d359a953e108a..aa843add27476 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -121,15 +121,15 @@ private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataTyp extends DeferredObject with HiveInspectors { private val wrapper = wrapperFor(oi, dataType) - private var func: () => Any = _ - def set(func: () => Any): Unit = { + private var func: Any = _ + def set(func: Any): Unit = { this.func = func } override def prepare(i: Int): Unit = {} - override def get(): AnyRef = wrapper(func()).asInstanceOf[AnyRef] + override def get(): AnyRef = wrapper(func).asInstanceOf[AnyRef] } -case class HiveGenericUDF( +private[hive] case class HiveGenericUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression with HiveInspectors @@ -180,7 +180,7 @@ case class HiveGenericUDF( while (i < length) { val idx = i deferredObjects(i).asInstanceOf[DeferredObjectAdapter] - .set(() => children(idx).eval(input)) + .set(children(idx).eval(input)) i += 1 } unwrapper(function.evaluate(deferredObjects)) @@ -200,17 +200,14 @@ case class HiveGenericUDF( val setDeferredObjects = childrenEvals.zipWithIndex.zip(children.map(_.dataType)).map { case ((eval, i), dt) => - val funcTerm = ctx.freshName("func") - val ft = CodeGenerator.boxedType(dt) val deferredObjectAdapterClz = classOf[DeferredObjectAdapter].getCanonicalName s""" - |scala.Function0<$ft> $funcTerm = new scala.Function0() { - | @Override - | public $ft apply() { - | return ${eval.isNull} ? null : ${eval.value}; - | } - |}; - |(($deferredObjectAdapterClz) $refTerm.deferredObjects()[$i]).set($funcTerm); + |if (${eval.isNull}) { + | (($deferredObjectAdapterClz) $refTerm.deferredObjects()[$i]).set(null); + |} else { + | (($deferredObjectAdapterClz) $refTerm.deferredObjects()[$i]).set(${eval.value}); + |} + | |""".stripMargin } From cbf44cd35edf640ec8d64dfee0fa1a873f891049 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Jan 2023 11:13:03 +0800 Subject: [PATCH 5/9] clean --- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index aa843add27476..8d6b68eb0b87a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -198,8 +198,8 @@ private[hive] case class HiveGenericUDF( val refTerm = ctx.addReferenceObj("this", this) val childrenEvals = children.map(_.genCode(ctx)) - val setDeferredObjects = childrenEvals.zipWithIndex.zip(children.map(_.dataType)).map { - case ((eval, i), dt) => + val setDeferredObjects = childrenEvals.zipWithIndex.map { + case (eval, i) => val deferredObjectAdapterClz = classOf[DeferredObjectAdapter].getCanonicalName s""" |if (${eval.isNull}) { From bb02b81f2c096c5b750b02b544b27a72fed4877a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Jan 2023 15:08:51 +0800 Subject: [PATCH 6/9] boxing --- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 8d6b68eb0b87a..889f4fab41910 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -211,7 +211,7 @@ private[hive] case class HiveGenericUDF( |""".stripMargin } - val resultType = CodeGenerator.javaType(dataType) + val resultType = CodeGenerator.boxedType(dataType) ev.copy(code = code""" |${childrenEvals.map(_.code).mkString("\n")} From 203ed914f125389095ca8a3eb48b2ea6dd30a20f Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Jan 2023 15:28:20 +0800 Subject: [PATCH 7/9] boxing --- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 889f4fab41910..d7d87eeb336b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -212,18 +212,19 @@ private[hive] case class HiveGenericUDF( } val resultType = CodeGenerator.boxedType(dataType) + val resultTerm = ctx.freshName("result") ev.copy(code = code""" |${childrenEvals.map(_.code).mkString("\n")} | |${setDeferredObjects.mkString("\n")} | - |$resultType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultType $resultTerm = null; |boolean ${ev.isNull} = false; |try { - | ${ev.value} = ($resultType) $refTerm.unwrapper().apply( + | $resultTerm = ($resultType) $refTerm.unwrapper().apply( | $refTerm.function().evaluate($refTerm.deferredObjects())); - | ${ev.isNull} = ${ev.value} == null; + | ${ev.isNull} = $resultTerm == null; |} catch (Throwable e) { | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( | "${funcWrapper.functionClassName}", @@ -231,6 +232,10 @@ private[hive] case class HiveGenericUDF( | "${dataType.catalogString}", | e); |} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |if (!${ev.isNull}) { + | ${ev.value} = $resultTerm; + |} |""".stripMargin ) } From 696e785147689c7dd0c0fe2b92f05873d21e2b75 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Jan 2023 16:23:51 +0800 Subject: [PATCH 8/9] update test --- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 3 --- .../spark/sql/hive/execution/HiveUDFSuite.scala | 11 +++++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index d7d87eeb336b9..e176229e86fd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -207,7 +207,6 @@ private[hive] case class HiveGenericUDF( |} else { | (($deferredObjectAdapterClz) $refTerm.deferredObjects()[$i]).set(${eval.value}); |} - | |""".stripMargin } @@ -216,9 +215,7 @@ private[hive] case class HiveGenericUDF( ev.copy(code = code""" |${childrenEvals.map(_.code).mkString("\n")} - | |${setDeferredObjects.mkString("\n")} - | |$resultType $resultTerm = null; |boolean ${ev.isNull} = false; |try { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index e5cfe3cebc787..62ab11796f335 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -716,10 +716,13 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("SPARK-42051: HiveGenericUDF Codegen Support") { withUserDefinedFunction("CodeGenHiveGenericUDF" -> false) { sql(s"CREATE FUNCTION CodeGenHiveGenericUDF AS '${classOf[GenericUDFMaskHash].getName}'") - val df = sql("SELECT CodeGenHiveGenericUDF('Spark SQL')") - val plan = df.queryExecution.executedPlan - assert(plan.isInstanceOf[WholeStageCodegenExec]) - checkAnswer(df, Seq(Row("14ab8df5135825bc9f5ff7c30609f02f"))) + withTable("CodeGenHiveGenericUDF") { + sql(s"create table HiveGenericUDFTable as select 'Spark SQL' as v") + val df = sql("SELECT CodeGenHiveGenericUDF(v) from HiveGenericUDFTable") + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[WholeStageCodegenExec]) + checkAnswer(df, Seq(Row("14ab8df5135825bc9f5ff7c30609f02f"))) + } } } } From a11afe207e10cff32cb6170a3fdc12b618b032ac Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 31 Jan 2023 15:16:22 +0800 Subject: [PATCH 9/9] addr comments --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 6 ++--- .../sql/hive/execution/HiveUDFSuite.scala | 22 +++++++++++++++++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index e176229e86fd9..32ade60e20d07 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -156,7 +156,7 @@ private[hive] case class HiveGenericUDF( // Visible for codegen @transient - lazy val unwrapper = unwrapperFor(returnInspector) + lazy val unwrapper: Any => Any = unwrapperFor(returnInspector) @transient private lazy val isUDFDeterministic = { @@ -166,8 +166,8 @@ private[hive] case class HiveGenericUDF( // Visible for codegen @transient - lazy val deferredObjects = argumentInspectors.zip(children).map { case (inspect, child) => - new DeferredObjectAdapter(inspect, child.dataType) + lazy val deferredObjects: Array[DeferredObject] = argumentInspectors.zip(children).map { + case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType) }.toArray[DeferredObject] override lazy val dataType: DataType = inspectorToDataType(returnInspector) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 62ab11796f335..baa25843d48b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.io.{LongWritable, Writable} -import org.apache.spark.{SparkFiles, TestUtils} +import org.apache.spark.{SparkException, SparkFiles, TestUtils} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.WholeStageCodegenExec @@ -716,7 +716,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("SPARK-42051: HiveGenericUDF Codegen Support") { withUserDefinedFunction("CodeGenHiveGenericUDF" -> false) { sql(s"CREATE FUNCTION CodeGenHiveGenericUDF AS '${classOf[GenericUDFMaskHash].getName}'") - withTable("CodeGenHiveGenericUDF") { + withTable("HiveGenericUDFTable") { sql(s"create table HiveGenericUDFTable as select 'Spark SQL' as v") val df = sql("SELECT CodeGenHiveGenericUDF(v) from HiveGenericUDFTable") val plan = df.queryExecution.executedPlan @@ -725,6 +725,24 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + + test("SPARK-42051: HiveGenericUDF Codegen Support w/ execution failure") { + withUserDefinedFunction("CodeGenHiveGenericUDF" -> false) { + sql(s"CREATE FUNCTION CodeGenHiveGenericUDF AS '${classOf[GenericUDFAssertTrue].getName}'") + withTable("HiveGenericUDFTable") { + sql(s"create table HiveGenericUDFTable as select false as v") + val df = sql("SELECT CodeGenHiveGenericUDF(v) from HiveGenericUDFTable") + val e = intercept[SparkException](df.collect()).getCause.asInstanceOf[SparkException] + checkError( + e, + "FAILED_EXECUTE_UDF", + parameters = Map( + "functionName" -> s"${classOf[GenericUDFAssertTrue].getName}", + "signature" -> "boolean", + "result" -> "void")) + } + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable {