From e553f10c67f129e59c526d6149e60c77348f546a Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 8 Apr 2015 01:41:17 +0900 Subject: [PATCH 01/18] Support List as a return type in Hive UDF --- .../spark/sql/hive/HiveInspectors.scala | 27 ++++++++++++++++- .../sql/hive/execution/UDFToListInt.java | 29 +++++++++++++++++++ .../sql/hive/execution/UDFToListString.java | 29 +++++++++++++++++++ .../sql/hive/execution/HiveUdfSuite.scala | 26 +++++++++++++++++ 4 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 7c7666f6e4b7c..fd46dce85f3a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -22,6 +22,8 @@ import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateUtils @@ -170,7 +172,7 @@ import scala.collection.JavaConversions._ * e.g. date_add(printf("%s-%s-%s", a,b,c), 3) * We don't need to unwrap the data for printf and wrap it again and passes in data_add */ -private[hive] trait HiveInspectors { +private[hive] trait HiveInspectors extends Logging { def javaClassToDataType(clz: Class[_]): DataType = clz match { // writable @@ -214,8 +216,16 @@ private[hive] trait HiveInspectors { case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) + // list type + case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => + logWarning("Failed to catch a correct component type in List<> because of type erasure," + + " so you need to handle it correctly by yourself") + ArrayType(ErasedType) + // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType + + case c => throw new HiveDataTypeException("Unknown java type: " + c) } /** @@ -673,3 +683,18 @@ private[hive] trait HiveInspectors { } } } + +/** + * :: DeveloperApi :: + * This represents an erased type because of type erasure in JVM. + */ +@DeveloperApi +class ErasedType private() extends DataType { + override def defaultSize: Int = 1 + private[spark] override def asNullable: ErasedType = this +} + +case object ErasedType extends ErasedType + +/** The exception thrown from the [[HiveInspectors]]. */ +private[hive] class HiveDataTypeException(message: String) extends Exception(message) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java new file mode 100644 index 0000000000000..67576a72f1980 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Arrays; +import java.util.List; + +public class UDFToListInt extends UDF { + public List evaluate(Object o) { + return Arrays.asList(1, 2, 3); + } +} diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java new file mode 100644 index 0000000000000..d698f46f1f854 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Arrays; +import java.util.List; + +public class UDFToListString extends UDF { + public List evaluate(Object o) { + return Arrays.asList("data1", "data2", "data3"); + } +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 7f49eac490572..b66b2e77558a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -133,6 +133,32 @@ class HiveUdfSuite extends QueryTest { TestHive.reset() } + test("UDFToListString") { + val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") + checkAnswer( + sql("SELECT testUDFToListString(s) FROM inputTable"), //.collect(), + Seq(Row("data1" :: "data2" :: "data3" :: Nil))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") + + TestHive.reset() + } + + test("UDFToListInt") { + val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") + checkAnswer( + sql("SELECT testUDFToListInt(s) FROM inputTable"), //.collect(), + Seq(Row(1 :: 2 :: 3 :: Nil))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") + + TestHive.reset() + } + test("UDFListListInt") { val testData = TestHive.sparkContext.parallelize( ListListIntCaseClass(Nil) :: From e21ce7e33139b96581dbd37785f99854e60379fd Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 8 Apr 2015 11:18:22 +0900 Subject: [PATCH 02/18] Add a blank line at the end of UDFToListString --- .../org/apache/spark/sql/hive/execution/UDFToListString.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java index d698f46f1f854..f02395cbba88b 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java @@ -26,4 +26,4 @@ public class UDFToListString extends UDF { public List evaluate(Object o) { return Arrays.asList("data1", "data2", "data3"); } -} \ No newline at end of file +} From 94064169380d129a41574e7ee3ccc994532f71f0 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 15 Apr 2015 01:46:44 +0900 Subject: [PATCH 03/18] Apply review comments --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index fd46dce85f3a9..244a4994fe944 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types +import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.sql.types._ /* Implicit conversions */ @@ -218,14 +219,14 @@ private[hive] trait HiveInspectors extends Logging { // list type case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => - logWarning("Failed to catch a correct component type in List<> because of type erasure," + - " so you need to handle it correctly by yourself") + logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + + " so you need to cast it into the correct type by yourself") ArrayType(ErasedType) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType - case c => throw new HiveDataTypeException("Unknown java type: " + c) + case c => throw new AnalysisException(s"Unsupported java type $c") } /** @@ -695,6 +696,3 @@ class ErasedType private() extends DataType { } case object ErasedType extends ErasedType - -/** The exception thrown from the [[HiveInspectors]]. */ -private[hive] class HiveDataTypeException(message: String) extends Exception(message) From f965c34bcea8efa6b58443ef31c4d857b5feb09e Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 15 Apr 2015 03:04:31 +0900 Subject: [PATCH 04/18] Fix code-style errors --- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index b66b2e77558a1..f6d1b6c12a93a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -139,7 +139,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") checkAnswer( - sql("SELECT testUDFToListString(s) FROM inputTable"), //.collect(), + sql("SELECT testUDFToListString(s) FROM inputTable"), Seq(Row("data1" :: "data2" :: "data3" :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") @@ -152,7 +152,7 @@ class HiveUdfSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") checkAnswer( - sql("SELECT testUDFToListInt(s) FROM inputTable"), //.collect(), + sql("SELECT testUDFToListInt(s) FROM inputTable"), Seq(Row(1 :: 2 :: 3 :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") From 1c7b9d1aae14de43ca9bf6e3414a45f1104e5043 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 17 Apr 2015 09:51:14 +0900 Subject: [PATCH 05/18] Remove a new type --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 244a4994fe944..ef60ba444fd7a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -221,7 +221,7 @@ private[hive] trait HiveInspectors extends Logging { case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + " so you need to cast it into the correct type by yourself") - ArrayType(ErasedType) + ArrayType(NullType) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType @@ -684,15 +684,3 @@ private[hive] trait HiveInspectors extends Logging { } } } - -/** - * :: DeveloperApi :: - * This represents an erased type because of type erasure in JVM. - */ -@DeveloperApi -class ErasedType private() extends DataType { - override def defaultSize: Int = 1 - private[spark] override def asNullable: ErasedType = this -} - -case object ErasedType extends ErasedType From a4887127834fc73063851fd57f095ff212f9b483 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 May 2015 13:36:50 +0900 Subject: [PATCH 06/18] Add StringToUtf8 to comvert String into UTF8String --- .../apache/spark/sql/hive/execution/HiveUdfSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index f6d1b6c12a93a..31a734f90a9e8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable +import org.apache.spark.sql.types.UTF8String import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.test.TestHive @@ -137,10 +138,15 @@ class HiveUdfSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") + /** Converts $"..." into UTF8String(...). */ + implicit class StringToUtf8(val sc: StringContext) { + def u(args: Any*): UTF8String = UTF8String(sc.s(args :_*)) + } + sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") checkAnswer( sql("SELECT testUDFToListString(s) FROM inputTable"), - Seq(Row("data1" :: "data2" :: "data3" :: Nil))) + Seq(Row(u"data1" :: u"data2" :: u"data3" :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") TestHive.reset() From 21e8763ccba103187561c64a2fa6e38286ff13b1 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 May 2015 16:18:40 +0900 Subject: [PATCH 07/18] Add TODO comments in UDFToListString of HiveUdfSuite --- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 31a734f90a9e8..d1338057d8ffb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -138,7 +138,11 @@ class HiveUdfSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") - /** Converts $"..." into UTF8String(...). */ + /** + * Converts $"..." into UTF8String(...). + * TODO: Catalyst (e.g., CatalystTypeConverters) cannot convert UTF8String + * into String correctly because of JVM type erasure. + */ implicit class StringToUtf8(val sc: StringContext) { def u(args: Any*): UTF8String = UTF8String(sc.s(args :_*)) } From 1e823167986113cc264be13bd22c631eeb1452e4 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 16 May 2015 11:58:46 +0900 Subject: [PATCH 08/18] Apply comments --- .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index ef60ba444fd7a..0bbab04466f57 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -218,7 +218,7 @@ private[hive] trait HiveInspectors extends Logging { case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) // list type - case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => + case c: Class[_] if c == classOf[java.util.List[_]] => logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + " so you need to cast it into the correct type by yourself") ArrayType(NullType) From ee232db94ffcb08b8cd33b94d03543bb4a663b60 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 8 Apr 2015 01:41:17 +0900 Subject: [PATCH 09/18] Support List as a return type in Hive UDF --- .../spark/sql/hive/HiveInspectors.scala | 27 ++++++++++++++++- .../sql/hive/execution/UDFToListInt.java | 29 +++++++++++++++++++ .../sql/hive/execution/UDFToListString.java | 29 +++++++++++++++++++ .../sql/hive/execution/HiveUDFSuite.scala | 26 +++++++++++++++++ 4 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index a6b8ead577fb5..965d7659dbe32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructF import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory} import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -172,7 +174,7 @@ import scala.collection.JavaConversions._ * e.g. date_add(printf("%s-%s-%s", a,b,c), 3) * We don't need to unwrap the data for printf and wrap it again and passes in data_add */ -private[hive] trait HiveInspectors { +private[hive] trait HiveInspectors extends Logging { def javaClassToDataType(clz: Class[_]): DataType = clz match { // writable @@ -216,8 +218,16 @@ private[hive] trait HiveInspectors { case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) + // list type + case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => + logWarning("Failed to catch a correct component type in List<> because of type erasure," + + " so you need to handle it correctly by yourself") + ArrayType(ErasedType) + // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType + + case c => throw new HiveDataTypeException("Unknown java type: " + c) } /** @@ -828,3 +838,18 @@ private[hive] trait HiveInspectors { } } } + +/** + * :: DeveloperApi :: + * This represents an erased type because of type erasure in JVM. + */ +@DeveloperApi +class ErasedType private() extends DataType { + override def defaultSize: Int = 1 + private[spark] override def asNullable: ErasedType = this +} + +case object ErasedType extends ErasedType + +/** The exception thrown from the [[HiveInspectors]]. */ +private[hive] class HiveDataTypeException(message: String) extends Exception(message) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java new file mode 100644 index 0000000000000..67576a72f1980 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListInt.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Arrays; +import java.util.List; + +public class UDFToListInt extends UDF { + public List evaluate(Object o) { + return Arrays.asList(1, 2, 3); + } +} diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java new file mode 100644 index 0000000000000..d698f46f1f854 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Arrays; +import java.util.List; + +public class UDFToListString extends UDF { + public List evaluate(Object o) { + return Arrays.asList("data1", "data2", "data3"); + } +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 56b0bef1d0571..e2e0f45fa7faa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -133,6 +133,32 @@ class HiveUDFSuite extends QueryTest { TestHive.reset() } + test("UDFToListString") { + val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") + checkAnswer( + sql("SELECT testUDFToListString(s) FROM inputTable"), //.collect(), + Seq(Row("data1" :: "data2" :: "data3" :: Nil))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") + + TestHive.reset() + } + + test("UDFToListInt") { + val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + testData.registerTempTable("inputTable") + + sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") + checkAnswer( + sql("SELECT testUDFToListInt(s) FROM inputTable"), //.collect(), + Seq(Row(1 :: 2 :: 3 :: Nil))) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") + + TestHive.reset() + } + test("UDFListListInt") { val testData = TestHive.sparkContext.parallelize( ListListIntCaseClass(Nil) :: From 93e3d4e7e1fc93e95ce09e01318e3f6fb7639d8b Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 8 Apr 2015 11:18:22 +0900 Subject: [PATCH 10/18] Add a blank line at the end of UDFToListString --- .../org/apache/spark/sql/hive/execution/UDFToListString.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java index d698f46f1f854..f02395cbba88b 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFToListString.java @@ -26,4 +26,4 @@ public class UDFToListString extends UDF { public List evaluate(Object o) { return Arrays.asList("data1", "data2", "data3"); } -} \ No newline at end of file +} From 6984bf4067ef8d98c21097cd2b2265aece3b8a87 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 15 Apr 2015 01:46:44 +0900 Subject: [PATCH 11/18] Apply review comments --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 965d7659dbe32..65855864aa1e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types +import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -220,14 +221,14 @@ private[hive] trait HiveInspectors extends Logging { // list type case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => - logWarning("Failed to catch a correct component type in List<> because of type erasure," + - " so you need to handle it correctly by yourself") + logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + + " so you need to cast it into the correct type by yourself") ArrayType(ErasedType) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType - case c => throw new HiveDataTypeException("Unknown java type: " + c) + case c => throw new AnalysisException(s"Unsupported java type $c") } /** @@ -850,6 +851,3 @@ class ErasedType private() extends DataType { } case object ErasedType extends ErasedType - -/** The exception thrown from the [[HiveInspectors]]. */ -private[hive] class HiveDataTypeException(message: String) extends Exception(message) From 7f812fd8f6d07f6779011e2e98b631733264f2a9 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 15 Apr 2015 03:04:31 +0900 Subject: [PATCH 12/18] Fix code-style errors --- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index e2e0f45fa7faa..24830f5aa3592 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -139,7 +139,7 @@ class HiveUDFSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") checkAnswer( - sql("SELECT testUDFToListString(s) FROM inputTable"), //.collect(), + sql("SELECT testUDFToListString(s) FROM inputTable"), Seq(Row("data1" :: "data2" :: "data3" :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") @@ -152,7 +152,7 @@ class HiveUDFSuite extends QueryTest { sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") checkAnswer( - sql("SELECT testUDFToListInt(s) FROM inputTable"), //.collect(), + sql("SELECT testUDFToListInt(s) FROM inputTable"), Seq(Row(1 :: 2 :: 3 :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") From af61f2ecc821cede51a0bb446ba67230d39b3f10 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 17 Apr 2015 09:51:14 +0900 Subject: [PATCH 13/18] Remove a new type --- .../org/apache/spark/sql/hive/HiveInspectors.scala | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 65855864aa1e6..7e722f9c4c24d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -223,7 +223,7 @@ private[hive] trait HiveInspectors extends Logging { case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + " so you need to cast it into the correct type by yourself") - ArrayType(ErasedType) + ArrayType(NullType) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType @@ -839,15 +839,3 @@ private[hive] trait HiveInspectors extends Logging { } } } - -/** - * :: DeveloperApi :: - * This represents an erased type because of type erasure in JVM. - */ -@DeveloperApi -class ErasedType private() extends DataType { - override def defaultSize: Int = 1 - private[spark] override def asNullable: ErasedType = this -} - -case object ErasedType extends ErasedType From fdb2ae4a01b6cea154311632078f8fb3026ce945 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 May 2015 13:36:50 +0900 Subject: [PATCH 14/18] Add StringToUtf8 to comvert String into UTF8String --- .../apache/spark/sql/hive/execution/HiveUDFSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 24830f5aa3592..4e414be412109 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable +import org.apache.spark.sql.types.UTF8String import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.test.TestHive @@ -137,10 +138,15 @@ class HiveUDFSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") + /** Converts $"..." into UTF8String(...). */ + implicit class StringToUtf8(val sc: StringContext) { + def u(args: Any*): UTF8String = UTF8String(sc.s(args :_*)) + } + sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") checkAnswer( sql("SELECT testUDFToListString(s) FROM inputTable"), - Seq(Row("data1" :: "data2" :: "data3" :: Nil))) + Seq(Row(u"data1" :: u"data2" :: u"data3" :: Nil))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") TestHive.reset() From 7114a4735a4ccbd1211b10fa757f0e27012ef96d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 May 2015 16:18:40 +0900 Subject: [PATCH 15/18] Add TODO comments in UDFToListString of HiveUdfSuite --- .../org/apache/spark/sql/hive/execution/HiveUDFSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 4e414be412109..5c6e843ae8509 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -138,7 +138,11 @@ class HiveUDFSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") - /** Converts $"..." into UTF8String(...). */ + /** + * Converts $"..." into UTF8String(...). + * TODO: Catalyst (e.g., CatalystTypeConverters) cannot convert UTF8String + * into String correctly because of JVM type erasure. + */ implicit class StringToUtf8(val sc: StringContext) { def u(args: Any*): UTF8String = UTF8String(sc.s(args :_*)) } From 2844a8e6c4cbfafafadff7e072a971c6656ae113 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 16 May 2015 11:58:46 +0900 Subject: [PATCH 16/18] Apply comments --- .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 7e722f9c4c24d..d86ad41123576 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -220,7 +220,7 @@ private[hive] trait HiveInspectors extends Logging { case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) // list type - case c: Class[_] if c == classOf[java.util.List[java.lang.Object]] => + case c: Class[_] if c == classOf[java.util.List[_]] => logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + " so you need to cast it into the correct type by yourself") ArrayType(NullType) From 92ed7a6710ced76823365f493f753bc6ca947b30 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 7 Jul 2015 08:11:33 +0900 Subject: [PATCH 17/18] Throw an exception when java list type used --- .../spark/sql/hive/HiveInspectors.scala | 19 +++++------- .../sql/hive/execution/HiveUDFSuite.scala | 31 ++++++------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index d86ad41123576..a925e18ee145e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -24,13 +24,10 @@ import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory} import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} import org.apache.spark.Logging -import org.apache.spark.annotation.DeveloperApi - import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types -import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.unsafe.types.UTF8String /* Implicit conversions */ @@ -219,15 +216,15 @@ private[hive] trait HiveInspectors extends Logging { case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) - // list type - case c: Class[_] if c == classOf[java.util.List[_]] => - logWarning("Failed to catch a component type in List<> because of type erasure in JVM," + - " so you need to cast it into the correct type by yourself") - ArrayType(NullType) - // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType + // java list type unsupported + case c: Class[_] if c == classOf[java.util.List[_]] => + throw new AnalysisException( + "List type in java is unsupported because " + + "JVM type erasure makes spark fail to catch a component type in List<>") + case c => throw new AnalysisException(s"Unsupported java type $c") } @@ -804,8 +801,8 @@ private[hive] trait HiveInspectors extends Logging { } implicit class typeInfoConversions(dt: DataType) { + import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory._ import org.apache.hadoop.hive.serde2.typeinfo._ - import TypeInfoFactory._ private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match { case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 5c6e843ae8509..4a664cc3df20b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -22,16 +22,14 @@ import java.util import java.util.Properties import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF} import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject +import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF} import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable -import org.apache.spark.sql.types.UTF8String -import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.test.TestHive - +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.util.Utils import scala.collection.JavaConversions._ @@ -49,8 +47,8 @@ case class ListStringCaseClass(l: Seq[String]) */ class HiveUDFSuite extends QueryTest { - import TestHive.{udf, sql} - import TestHive.implicits._ + import org.apache.spark.sql.hive.test.TestHive.implicits._ + import org.apache.spark.sql.hive.test.TestHive.{sql, udf} test("spark sql udf test that returns a struct") { udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) @@ -138,19 +136,10 @@ class HiveUDFSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") - /** - * Converts $"..." into UTF8String(...). - * TODO: Catalyst (e.g., CatalystTypeConverters) cannot convert UTF8String - * into String correctly because of JVM type erasure. - */ - implicit class StringToUtf8(val sc: StringContext) { - def u(args: Any*): UTF8String = UTF8String(sc.s(args :_*)) - } - sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") - checkAnswer( - sql("SELECT testUDFToListString(s) FROM inputTable"), - Seq(Row(u"data1" :: u"data2" :: u"data3" :: Nil))) + intercept[AnalysisException] { + sql("SELECT testUDFToListString(s) FROM inputTable") + } sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") TestHive.reset() @@ -161,9 +150,9 @@ class HiveUDFSuite extends QueryTest { testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") - checkAnswer( - sql("SELECT testUDFToListInt(s) FROM inputTable"), - Seq(Row(1 :: 2 :: 3 :: Nil))) + intercept[AnalysisException] { + sql("SELECT testUDFToListInt(s) FROM inputTable") + } sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") TestHive.reset() From 1c3df2a4fdb9aa44c1d100e047bf1a267f9ad0ca Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 7 Jul 2015 10:12:49 +0900 Subject: [PATCH 18/18] Fix comments --- .../apache/spark/sql/hive/execution/HiveUDFSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 979b431dbf1f5..44686204c2af7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -138,9 +138,11 @@ class HiveUDFSuite extends QueryTest { testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") - intercept[AnalysisException] { + val errMsg = intercept[AnalysisException] { sql("SELECT testUDFToListString(s) FROM inputTable") } + assert(errMsg.getMessage === "List type in java is unsupported because " + + "JVM type erasure makes spark fail to catch a component type in List<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") TestHive.reset() @@ -151,9 +153,11 @@ class HiveUDFSuite extends QueryTest { testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") - intercept[AnalysisException] { + val errMsg = intercept[AnalysisException] { sql("SELECT testUDFToListInt(s) FROM inputTable") } + assert(errMsg.getMessage === "List type in java is unsupported because " + + "JVM type erasure makes spark fail to catch a component type in List<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") TestHive.reset()