From 62a39de5d132896b0132024e46f26e4853a61734 Mon Sep 17 00:00:00 2001 From: nivo091 Date: Thu, 28 Feb 2019 19:32:56 +0530 Subject: [PATCH 1/6] [SPARK-26560][SQL]:Repeat select on HiveUDF fails --- .../spark/sql/hive/HiveSessionCatalog.scala | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 3f0a9f222feb2..682df2dffa1c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -66,10 +66,11 @@ private[sql] class HiveSessionCatalog( name: String, clazz: Class[_], input: Seq[Expression]): Expression = { - - Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { - var udfExpr: Option[Expression] = None - try { + val originalClassLoader = Thread.currentThread().getContextClassLoader() + Thread.currentThread().setContextClassLoader(clazz.getClassLoader) + try { + Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { + var udfExpr: Option[Expression] = None // When we instantiate hive UDF wrapper class, we may throw exception if the input // expressions don't satisfy the hive UDF, such as type mismatch, input number // mismatch, etc. Here we catch the exception and throw AnalysisException instead. @@ -93,23 +94,25 @@ private[sql] class HiveSessionCatalog( udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. } - } catch { - case NonFatal(e) => - val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" - val errorMsg = - if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - s"$noHandlerMsg\nPlease make sure your function overrides " + - "`public StructObjectInspector initialize(ObjectInspector[] args)`." - } else { - noHandlerMsg - } - val analysisException = new AnalysisException(errorMsg) - analysisException.setStackTrace(e.getStackTrace) - throw analysisException - } - udfExpr.getOrElse { - throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") + udfExpr.getOrElse { + throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") + } } + } catch { + case NonFatal(e) => + val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" + val errorMsg = + if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + s"$noHandlerMsg\nPlease make sure your function overrides " + + "`public StructObjectInspector initialize(ObjectInspector[] args)`." + } else { + noHandlerMsg + } + val analysisException = new AnalysisException(errorMsg) + analysisException.setStackTrace(e.getStackTrace) + throw analysisException + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader) } } From 02188e07a9d74ef7422a23db4351e49faa0006b2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 27 Dec 2019 21:53:56 +0900 Subject: [PATCH 2/6] Add UT & relevant jar, change to cleaner fix --- .../spark/sql/hive/HiveSessionCatalog.scala | 80 +++++++++--------- sql/hive/src/test/TestUDTF-dynamicload.jar | Bin 0 -> 7462 bytes .../sql/hive/execution/SQLQuerySuite.scala | 50 +++++++++++ 3 files changed, 90 insertions(+), 40 deletions(-) create mode 100644 sql/hive/src/test/TestUDTF-dynamicload.jar diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 682df2dffa1c4..766a25b219909 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -66,53 +66,53 @@ private[sql] class HiveSessionCatalog( name: String, clazz: Class[_], input: Seq[Expression]): Expression = { - val originalClassLoader = Thread.currentThread().getContextClassLoader() - Thread.currentThread().setContextClassLoader(clazz.getClassLoader) - try { + // Current thread context classloader may not be the one loaded the class. Need to switch + // context classloader to initialize instance properly. + Utils.withContextClassLoader(clazz.getClassLoader) { Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { var udfExpr: Option[Expression] = None - // When we instantiate hive UDF wrapper class, we may throw exception if the input - // expressions don't satisfy the hive UDF, such as type mismatch, input number - // mismatch, etc. Here we catch the exception and throw AnalysisException instead. - if (classOf[UDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[UDAF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction( - name, - new HiveFunctionWrapper(clazz.getName), - input, - isUDAFBridgeRequired = true)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. + try { + // When we instantiate hive UDF wrapper class, we may throw exception if the input + // expressions don't satisfy the hive UDF, such as type mismatch, input number + // mismatch, etc. Here we catch the exception and throw AnalysisException instead. + if (classOf[UDF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { + udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[UDAF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveUDAFFunction( + name, + new HiveFunctionWrapper(clazz.getName), + input, + isUDAFBridgeRequired = true)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. + } + } catch { + case NonFatal(e) => + val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" + val errorMsg = + if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + s"$noHandlerMsg\nPlease make sure your function overrides " + + "`public StructObjectInspector initialize(ObjectInspector[] args)`." + } else { + noHandlerMsg + } + val analysisException = new AnalysisException(errorMsg) + analysisException.setStackTrace(e.getStackTrace) + throw analysisException } udfExpr.getOrElse { throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") } } - } catch { - case NonFatal(e) => - val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" - val errorMsg = - if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - s"$noHandlerMsg\nPlease make sure your function overrides " + - "`public StructObjectInspector initialize(ObjectInspector[] args)`." - } else { - noHandlerMsg - } - val analysisException = new AnalysisException(errorMsg) - analysisException.setStackTrace(e.getStackTrace) - throw analysisException - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader) } } diff --git a/sql/hive/src/test/TestUDTF-dynamicload.jar b/sql/hive/src/test/TestUDTF-dynamicload.jar new file mode 100644 index 0000000000000000000000000000000000000000..b73b17d5c7880a56981288d0ae2041b0970b14f3 GIT binary patch literal 7462 zcmbW6bzD^Y*2ahK?(UKfX=#S;?oNRjIz&1Iqy%XYDQN~lkPf9pngNmSPD!a3&+*)= z_gv38_ucz5Gy9L(-)F7$*?aB%TaUUD96T!E+Xi}-srK8!_b+$=IRNBl$El-@0)V4E zLNHA_K`{07K?A_RpTGbB@c4KCQ~%}JXy4Da09#nu+x$NvxIaVeEv!Kx@c)xV^j}%_ z4qpE>IHBU7>qGTxpcTl)!_C3+_ed-Mu5{r4C_Me&gFUT1Y=5WI{~cZCKf-4Fhjbes z3umy?_u{nwTIJhifDHJS0}gG(I0ifbV2lC)NdJ#2ayBkDZVpyDGTK0Ckf)0WFNc+r zg}eK2*WNEuEYy9dMW#a>po-fFsa$2WZ>e;K(!=%_epX^mmXg-eYKPsl?dNg%E+DTPpqR z^&Za@Hici|PZN8?H1Lm&h;n*h!gRpz$J&QChohRhD4TIc`;U?u*5n5Kcsy{Urn3}4~(Bd1dB3P3*D-_w^v`l9uX67M| zo*R zFYYrA>hw&JS&87;WJVQP*2?Is-0LaAaBMNz>pB*^g}iNeea^LwXgWw)FOVATgq(Kz^vQl@*$r}#J(ZMk)mx1?fFI8j*g>g1+iaQ{LsdSjJz{Fr_lFA zaH}5uOl{&kR(-;O9p~NWjtVQnXyX@;R$kZ1#d!^rOsEPv$?@-dK;;eeg`ww^jZTwc zhI%tqno><15)stXb>hzmpiwXU$8I7eTL6Vs_XZ>jv2;+V#spr!Xw730h2{h6U-{J{ z+qNImj|wS%;&J5)p}SZ&zc@xK$f3y|x{CV9dwi9_?2|cIBR`hTuqevrALor+lMx~q z>5anEEO(}zcPz3rQHyplKax~L*OD_i`8aDcOpXyaBK-;uFKJ~HpC!B7_bUmL@pPDmRNAgKGem8lX{Xq~kAWkofb9tci!amaHpDIwe)lm2WcBLWswY9n`W z7B;3;j9$ncl=Y(m*|vg;Zkd^EJ0TLKE{vG5{0Nv7qy?JjNu&8s3%F9`7Juf zjOC-ObkNO3T6nCsmC-n9Lm@tfwjX_0*lfgUulVj{P-JEJelR{S&}g`9bEKW#s9sY4 z(*qz&5kgO_|HnL8th2)w!=7-> z^f5w{k$2mBPFGcIT`1p5{Cl(i_NGub(bp#rQd@!xti8YL zsubN<*qN$XKg!OwngXp`ma&&J*NED?TXD*9zbJlXTJ$I|x2w$zo1SAH$h;mP*o zyl8{H0Qz;NUe)R4wecvwyH$M^>D-4FzhJces{H`Nn653w<`x4J_We|WvNt~7?o49k zRcNl>j~{uvpDeNmm<(&_^`BhN9z|lEW1yTna#U(ph&%{#bGhk!$oqvU1EoGmh0c&c zgu7h<$;ODuS}$tUgP;ocO|_**DYov(gtk?b8IdWzn302kehbOrBbw}Du#!&HfMmH& zpHwUC#l+L;>~qyLQZajZypl|YDJ6Is}l_XkvU6vC}V>J`DAr_kG? z>*g$X)`)C!c&ElUxP)Fte!bH)S`yASbd?InQMgu-Nc6kU6Braw41-7(3^=>n;Y=kf z$Uzv06y*-6G!>{+q?(QGV+-|GdL6GvYb!RJ>2GsCZCmuX`MhiJv18jDiLVpD=I$#U zS?rMG<+K6Hxh_eW9ck9+)d9{74sh?i6uI%HL8MbeQSDi8#=nVTGnZu*Y0%jaotcbX z9c0yga;-XY*l}4~ML*SGb4r?zQebR995%D;)hMU%rm`cW-p))KBt2S+?vHZ zB+2CkM2PTLz>M{CG+fCQpr?75W+*GuDDQ)${y_d^W#e|ktJG|hHPgu3VMeS*QjHV> zUwV1&Udjjb>aI2*eEQH;ToS~C6MHg2OyQ(_zsy!BY zTc|WRrR*Mi7kJ}~RUqH#7z*>w zqS9y@7@3Z}Fleun+Y%PCc~xy_0FxjpKijK6qg(ui{j7BiR9X8PSA1Ur|aGmB^`DW+8q@`a9G%jz_>4^{8O77}DB z6xSYGKqL+@v)qhvwb?>nC#mjFMfoP)j6lfG?+=SJH{i?fYoWiI94b$c5nXi> zAXAsZKoCV+mN-((r1;V^xg=?Fo;LZCG;m_MM+jSsf0~nYn8~=fPy-8}{}81MyqYeb zVEu`|7BwJH@q$(<8b)KjeL^M3S-=G#BI*M_CpjXUk>n>vU{BY1h-Zk?th~B6<(P;^ zAY=2Ha`|z7d)oc9C5NLU8>s2WrF2_U!pTeIeu(`2grErkG0{eNtP^wGDdD82j9B-e zk(oK+3Q(_4-TqZap*78A)6$eCel_bn+>Mej4DA*ok$?n?q@nI>S^_sDf#Qi(Y|Q)K z+WJz8rHMBvUgEl9wycQ{SAm>7#OX4Uw$8O=;2Ad$?Nx`Kl!MW+-uN(|626ii^RP_= zY0*v%HAIRdLx?rCh8MErcvps^#xR+{;+8+tHpa7Y|kp3=w{1KxcY6C?|fi`*xx2;s%?{GPF*SkXa_ zfPjlY7>%bCpSW?5L^V#!6U-U2&{f*SP@6uqf*`Ye6oT31Pd}~du#%N@%w0Ko2|{R5 z`YOm=YeIKCRSi|942&dR4398VIGW|06Yb%CHXma#!-2U8N|e!6>yIM}gL94CE@Lkq zq6d{`&0EG-BENCrNV)%2{8S1v0o6)a*k;&BSJH*yMwxn@>?}qbLR(+g-H>X{B?@!^ zuBcs+h;qu(owmY4U?_1*&P)rfoSE)u;^;4QY7fQ7d#= zPTibIH=^FQBJKF$ZS90NuO!7`I~a`IAR7vu#Lm*(WRq)aJ{H`gZ+*o> zdOBnM=xBja!(k^M61e^{t_wTlRMMN6Y34zr)1$Dr#G}l3MrZ6K($!g;gEn9qVj>Fa2u)$8I9WXP;D$ST$$4rT zS-xF@o89Hg)N{%9bY-DxwhLA~0z)+vv1EdiG?tBVgBn_9g{&{OmZHTtJO`2+$X%(Q z?&G=7b=vJJb|CVQGEmfMT_n$ODomliN9KQwH=ZonW4&$tQbD`H&huOh>5D?bs_z2| z2^15itd3_5Dg+1Gt<_y6&utd`rjq;55besfsccYjkT(ER2Gqm4oQDaK8=08v?!#Pk z)#|tz!EX z0(fXkcq=^~YF#Uw+=^XZ0BR4JPGO^?6P$Clj52_5?>0mvo;wzuJc8IfqNYz~?A}It z8q%jj!JAK9YO_XJ_cgZeJ*gXoVEU>yGpdPOG>U1Y&0Vjkl8tjPuNq4gwcr3Z6 zOcG0m?6nGNOG*qxMMt7n#v0NT>9Ohui+mM=lcq}*2;$1g?D}L1x#JkE(m$1ud&1&H z`YR2@wV^DS-mjJrAfgg3&Wp<`qO{1@w7OKntwDPxI4?+E)9aM>JhnQA+=QVhUW5|M zEp5yBX!A?pScY$gtfMW=JKD5H-Nn6W6Y^4;kVp*%vv_;ag+-hZIVziV-j8r{bs;3dP;E(m+@9Wgi!6LWycQ`vj2J6j zP{0o@od@oM;vu;r2Pi2XZY=l#V@oG|j&Qn&a~hp6Ev%Mf!AYN!E}VLc76i;N29p7G z<1f~i6h$1f4@T<>sh^y7oghf>gk%Dwe)OjHjT+_x-z_3 z%hoRsAPh9CtPI+-Z-ohXzgeurG!n+D!_TNfBIB+}0^S0saHL0*(mq11Hvf3JdVRSK zTi)u|VBFGVRC(ifJ$ACyiY;z3-*SypYkURqZ)?6ym)e*;3h-=Mn?xcs?cQC&BtKoe zUQ(S2c-z!sVCMd2b-EJ~L@04f$q)6|mJr~bAy9pXAZn)qxn7)UQT2uFwgYc}^;xQrKPF?B?EagXjR=mRRDEBB z;TJB{5A+fJp~Db7k1$mcvNvIwY{)b@L7nipV6eawG z+D}mTCu*kcbPf0Gw zW_|5!-)5+xbL6z(VHqw~lf8p!x?l8poACujKuDg`SJS0rY@v=P(UY12`KylfnSMLC z4KHbZ)>N>Y5!x39c00&ek%B{ApTJ-HV6WS}w`eS<3xP=l8ksQn# zvniIN*oOx-zx_f>u*JqAwwKVTHgn_Cp2P+@)Lb+&C~`?RC153ITz z>4!>f|0g*;xs=zK_}-K)&IS(2uEAh6|!;oULu6Fp^fG zhxt=hOz$hK`G}`@@v;WxK(!t!USzmJ^=*VNX)13?gZVm`HCBhf6`au3Ix(_fIoF-q&j4cx=h zh#6}*TwCY^3J=4uCpeMzU5UkXKjjCS2nJQ@>FMhYwIIs1ak_vO6IaFObI}|O0Z;r6 zsyJ7w_7*_}AH?PuI_9s5IGC><;*U8<AA^A`NO=RsVCa7G9 zc+_Tz6Y2a?=Ss>iG)Jz!ylr$@Oun?-E+axMu?_nk$3gJ<;OgobGwm@QGQ4r?)w$@f z&+G+%axOPeyFifE^V(K=j7NxMDqJE(8u@If#qBHQ+e^pC*SOk63}&6fFP128vU%~7 zV#ihI!JSH>SHZ7{&u``1;0_sUDxPN)V~1qfW4uQvub7U*21}sivU&7?PA?o3mbUkf zjlU4x!u-5!pJO3F^tl^r6x@BVeq6Szs%hUX-2ZCT{`g}xY9|+V(BgH@63_&9wJ)ApGuF{K>8OH3<#yBT4K#>0j=}{|?0c2^jh_@Lx{G z|IQ`)i97UPx!+xlzlZ`nU6Or})kJ_^*n;@;&}gyt~JOAHe=?0e{Qw55-@) zAb+3wpU0km1oG|9=s^VhI0U`JeH()QO false) { + val oldClassLoader = Thread.currentThread().getContextClassLoader + + // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly + // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for + // three times. + val jarPath = "src/test/TestUDTF-dynamicload.jar" + val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" + + sql( + s""" + |CREATE FUNCTION udtf_count3 + |AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3' + |USING JAR '$jarURL' + """.stripMargin) + + assert(Thread.currentThread().getContextClassLoader eq oldClassLoader) + + // JAR will be loaded at first usage, and it will change the current thread's + // context classloader to jar classloader in sharedState. + // See SessionState.addJar for details. + checkAnswer( + sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Row(3) :: Nil) + + assert(Thread.currentThread().getContextClassLoader ne oldClassLoader) + assert(Thread.currentThread().getContextClassLoader eq + spark.sqlContext.sharedState.jarClassLoader) + + // Roll back to the original classloader and run query again. + Thread.currentThread().setContextClassLoader(oldClassLoader) + checkAnswer( + sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Row(3) :: Nil) + } + } } From b801ac57c04d6d8384f735b79dfaefa5bee77ba2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 28 Dec 2019 11:55:35 +0900 Subject: [PATCH 3/6] Just apply Spark ClassLoader to the test as only controlling thread classloader for tests in SQLQuerySuite didn't work --- .../sql/hive/execution/SQLQuerySuite.scala | 72 +++++++++---------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 05cc5009742ab..ab507095a42ba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -70,17 +71,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ import spark.implicits._ - private var threadContextClassLoader: ClassLoader = _ - - override protected def beforeEach(): Unit = { - threadContextClassLoader = Thread.currentThread().getContextClassLoader - } - - override protected def afterEach(): Unit = { - Thread.currentThread().setContextClassLoader(threadContextClassLoader) - } - - test("query global temp view") { val df = Seq(1).toDF("i1") df.createGlobalTempView("tbl1") @@ -2505,40 +2495,44 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " + "current thread context classloader") { - withUserDefinedFunction("udtf_count3" -> false) { - val oldClassLoader = Thread.currentThread().getContextClassLoader + // force to use Spark classloader as other test (even in other test suites) may change the + // current thread's context classloader to jar classloader + Utils.withContextClassLoader(Utils.getSparkClassLoader) { + withUserDefinedFunction("udtf_count3" -> false) { + val sparkClassLoader = Thread.currentThread().getContextClassLoader + + // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly + // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for + // three times. + val jarPath = "src/test/TestUDTF-dynamicload.jar" + val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" - // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly - // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for - // three times. - val jarPath = "src/test/TestUDTF-dynamicload.jar" - val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" - - sql( - s""" - |CREATE FUNCTION udtf_count3 - |AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3' - |USING JAR '$jarURL' + sql( + s""" + |CREATE FUNCTION udtf_count3 + |AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3' + |USING JAR '$jarURL' """.stripMargin) - assert(Thread.currentThread().getContextClassLoader eq oldClassLoader) + assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) - // JAR will be loaded at first usage, and it will change the current thread's - // context classloader to jar classloader in sharedState. - // See SessionState.addJar for details. - checkAnswer( - sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Row(3) :: Nil) + // JAR will be loaded at first usage, and it will change the current thread's + // context classloader to jar classloader in sharedState. + // See SessionState.addJar for details. + checkAnswer( + sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Row(3) :: Nil) - assert(Thread.currentThread().getContextClassLoader ne oldClassLoader) - assert(Thread.currentThread().getContextClassLoader eq - spark.sqlContext.sharedState.jarClassLoader) + assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) + assert(Thread.currentThread().getContextClassLoader eq + spark.sqlContext.sharedState.jarClassLoader) - // Roll back to the original classloader and run query again. - Thread.currentThread().setContextClassLoader(oldClassLoader) - checkAnswer( - sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Row(3) :: Nil) + // Roll back to the original classloader and run query again. + Thread.currentThread().setContextClassLoader(sparkClassLoader) + checkAnswer( + sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Row(3) :: Nil) + } } } } From 418e027a345875c879b90408699c0b987929cc2c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sun, 29 Dec 2019 09:30:02 +0900 Subject: [PATCH 4/6] Address review comments --- sql/hive/src/test/noclasspath/README | 1 + .../TestUDTF.jar} | Bin .../spark/sql/hive/execution/SQLQuerySuite.scala | 4 ++-- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/test/noclasspath/README rename sql/hive/src/test/{TestUDTF-dynamicload.jar => noclasspath/TestUDTF.jar} (100%) diff --git a/sql/hive/src/test/noclasspath/README b/sql/hive/src/test/noclasspath/README new file mode 100644 index 0000000000000..8ce1b0bd09668 --- /dev/null +++ b/sql/hive/src/test/noclasspath/README @@ -0,0 +1 @@ +Place files which are being used as resources of tests but shouldn't be added to classpath. \ No newline at end of file diff --git a/sql/hive/src/test/TestUDTF-dynamicload.jar b/sql/hive/src/test/noclasspath/TestUDTF.jar similarity index 100% rename from sql/hive/src/test/TestUDTF-dynamicload.jar rename to sql/hive/src/test/noclasspath/TestUDTF.jar diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ab507095a42ba..6eba030b7592e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2504,7 +2504,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for // three times. - val jarPath = "src/test/TestUDTF-dynamicload.jar" + val jarPath = "src/test/noclasspath/TestUDTF.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" sql( @@ -2512,7 +2512,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |CREATE FUNCTION udtf_count3 |AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3' |USING JAR '$jarURL' - """.stripMargin) + """.stripMargin) assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) From 988061bf878b5c5685a0a0eb9fdabe05e1b4eaa8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sun, 29 Dec 2019 20:20:18 +0900 Subject: [PATCH 5/6] Rename jar file as it seems to be conflicted with TestUDTF.jar in classpath --- .../{TestUDTF.jar => TestUDTF-spark-26560.jar} | Bin .../spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename sql/hive/src/test/noclasspath/{TestUDTF.jar => TestUDTF-spark-26560.jar} (100%) diff --git a/sql/hive/src/test/noclasspath/TestUDTF.jar b/sql/hive/src/test/noclasspath/TestUDTF-spark-26560.jar similarity index 100% rename from sql/hive/src/test/noclasspath/TestUDTF.jar rename to sql/hive/src/test/noclasspath/TestUDTF-spark-26560.jar diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6eba030b7592e..3c9fa24ce32b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2504,7 +2504,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for // three times. - val jarPath = "src/test/noclasspath/TestUDTF.jar" + val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" sql( From 39a2171d361f38b54640c07ccb8990aa4c204238 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 30 Dec 2019 20:11:48 +0900 Subject: [PATCH 6/6] Explanation of the reason to change the classloader in test --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3c9fa24ce32b5..539b464743461 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2527,7 +2527,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assert(Thread.currentThread().getContextClassLoader eq spark.sqlContext.sharedState.jarClassLoader) - // Roll back to the original classloader and run query again. + // Roll back to the original classloader and run query again. Without this line, the test + // would pass, as thread's context classloader is changed to jar classloader. But thread + // context classloader can be changed from others as well which would fail the query; one + // example is spark-shell, which thread context classloader rolls back automatically. This + // mimics the behavior of spark-shell. Thread.currentThread().setContextClassLoader(sparkClassLoader) checkAnswer( sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),