From ab4b6b15fdaf580193be5f1c2b9befa9e04a207b Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Sat, 18 Mar 2017 10:09:26 -0400 Subject: [PATCH 1/6] SPARK-11421 Squashed addjar pr --- R/pkg/R/context.R | 28 ++++++++++++++++ R/pkg/tests/fulltests/test_context.R | 12 +++++++ .../scala/org/apache/spark/SparkContext.scala | 30 ++++++++++++++++- .../scala/org/apache/spark/TestUtils.scala | 21 ++++++++++++ .../org/apache/spark/SparkContextSuite.scala | 32 +++++++++++++++++-- .../spark/deploy/SparkSubmitSuite.scala | 19 +++-------- python/pyspark/context.py | 15 +++++++++ python/pyspark/tests.py | 14 ++++++++ 8 files changed, 153 insertions(+), 18 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 443c2ff8f9ace..f3ab5670b40c0 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -319,6 +319,34 @@ spark.addFile <- function(path, recursive = FALSE) { invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive)) } + +#' Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. +#' +#' The \code{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported +#' filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. +#' If \code{addToCurrentClassLoader} is true, add the jar to the current threads' classloader. In +#' general adding to the current threads' class loader will impact all other application threads +#' unless they have explicitly changed their class loader. +#' +#' @rdname spark.addJar +#' @param path The path of the jar to be added +#' @param addToCurrentClassLoader Whether to add the jar to the current driver classloader. +#' Default is FALSE. +#' @export +#' @examples +#'\dontrun{ +#' spark.addJar("/path/to/something.jar", TRUE) +#'} +#' @note spark.addJar since 2.2.0 +spark.addJar <- function(path, addToCurrentClassLoader = FALSE) { + sc <- getSparkContext() + normalizedPath <- suppressWarnings(normalizePath(path)) + scala_sc <- callJMethod(sc, "sc") + invisible(callJMethod(scala_sc, "addJar", normalizedPath, addToCurrentClassLoader)) +} + + + #' Get the root directory that contains files added through spark.addFile. #' #' @rdname spark.getSparkFilesRootDirectory diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index 77635c5a256b9..8e26544737b88 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -168,6 +168,18 @@ test_that("spark.lapply should perform simple transforms", { sparkR.session.stop() }) +test_that("add jar should work and allow usage of the jar on the driver node", { + sparkR.sparkContext() + + destDir <- paste0(tempdir(), "/", "testjar") + jarName <- callJStatic("org.apache.spark.TestUtils", "createDummyJar", + destDir, "sparkrTests", "DummyClassForAddJarTest") + + spark.addJar(jarName, addToCurrentClassLoader = TRUE) + testClass <- newJObject("sparkrTests.DummyClassForAddJarTest") + expect_true(class(testClass) == "jobj") +}) + test_that("add and get file to be downloaded with Spark job on every node", { sparkR.sparkContext(master = sparkRTestMaster) # Test add file. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f25d346e6e54..9aca6e9910601 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1802,7 +1802,20 @@ class SparkContext(config: SparkConf) extends Logging { * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ - def addJar(path: String) { + def addJar(path: String): Unit = { + addJar(path, false) + } + + /** + * Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future. + * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * @param addToCurrentClassLoader if true will add the jar to the current threads' classloader. + * In general adding to the current threads' class loader will impact all other application + * threads unless they have explicitly changed their class loader. + */ + @DeveloperApi + def addJar(path: String, addToCurrentClassLoader: Boolean) { def addJarFile(file: File): String = { try { if (!file.exists()) { @@ -1844,6 +1857,21 @@ class SparkContext(config: SparkConf) extends Logging { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } + + if (addToCurrentClassLoader) { + val currentCL = Utils.getContextOrSparkClassLoader + currentCL match { + case cl: MutableURLClassLoader => + val uri = if (path.contains("\\")) { + // For local paths with backslashes on Windows, URI throws an exception + new File(path).toURI + } else { + new URI(path) + } + cl.addURL(uri.toURL) + case _ => logWarning(s"Unsupported cl $currentCL will not update jars thread cl") + } + } } } } diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index a80016dd22fc5..736c73c81b4ac 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -168,6 +168,27 @@ private[spark] object TestUtils { createCompiledClass(className, destDir, sourceFile, classpathUrls) } + /** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */ + def createDummyJar(destDir: String, packageName: String, className: String): String = { + val srcDir = new File(destDir, packageName) + srcDir.mkdirs() + val excSource = new JavaSourceFromString(new File(srcDir, className).toURI.getPath, + s"""package $packageName; + | + |public class $className implements java.io.Serializable { + | public static String helloWorld(String arg) { return "Hello " + arg; } + | public static int addStuff(int arg1, int arg2) { return arg1 + arg2; } + |} + """. + stripMargin) + val excFile = createCompiledClass(className, srcDir, excSource, Seq.empty) + val jarFile = new File(destDir, + s"$packageName-$className-%s.jar".format(System.currentTimeMillis())) + val jarURL = createJar(Seq(excFile), jarFile, directoryPrefix = Some(packageName)) + jarURL.toString + } + + /** * Run some code involving jobs submitted to the given context and assert that the jobs spilled. */ diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 0ed5f26863dad..a5003ab2cf8be 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark import java.io.File -import java.net.{MalformedURLException, URI} +import java.net.{MalformedURLException, URI, URL} import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit @@ -34,7 +34,7 @@ import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{MutableURLClassLoader, ThreadUtils, Utils} class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { @@ -309,6 +309,34 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.listJars().head.contains(tmpJar.getName)) } + Seq("local_mode", "non_local_mode").foreach { schedulingMode => + val tempDir = Utils.createTempDir().toString + val master = schedulingMode match { + case "local_mode" => "local" + case "non_local_mode" => "local-cluster[1,1,1024]" + } + val packageName = s"scala_$schedulingMode" + val className = "DummyClass" + val jarPath = TestUtils.createDummyJar(tempDir, packageName, className) + + // ensure we reset the classloader after the test completes + val originalClassLoader = Thread.currentThread.getContextClassLoader + try { + // load the exception from the jar + val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) + + test(s"jar can be added and used driver side in $schedulingMode") { + sc = new SparkContext(master, "test") + Thread.currentThread().setContextClassLoader(loader) + sc.addJar(jarPath, addToCurrentClassLoader = true) + val cl = Utils.getContextOrSparkClassLoader + cl.loadClass(s"$packageName.$className") + } + } finally { + Thread.currentThread.setContextClassLoader(originalClassLoader) + } + } + test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") { try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index cfbf56fb8c369..ed8809145b56d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.io._ import java.net.URI +import java.net.URL import java.nio.charset.StandardCharsets import java.nio.file.Files @@ -549,26 +550,14 @@ class SparkSubmitSuite Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator) assert(new File(rScriptDir).exists) + val tempDir = Utils.createTempDir().toString // compile a small jar containing a class that will be called from R code. - val tempDir = Utils.createTempDir() - val srcDir = new File(tempDir, "sparkrtest") - srcDir.mkdirs() - val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath, - """package sparkrtest; - | - |public class DummyClass implements java.io.Serializable { - | public static String helloWorld(String arg) { return "Hello " + arg; } - | public static int addStuff(int arg1, int arg2) { return arg1 + arg2; } - |} - """.stripMargin) - val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty) - val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis())) - val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest")) + val jarURL = TestUtils.createDummyJar(tempDir, "sparkrtest", "DummyClass") val args = Seq( "--name", "testApp", "--master", "local", - "--jars", jarURL.toString, + "--jars", jarURL, "--verbose", "--conf", "spark.ui.enabled=false", rScriptDir) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a33f6dcf31fc0..ec8a277e863d8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -860,6 +860,21 @@ def addPyFile(self, path): import importlib importlib.invalidate_caches() + def addJar(self, path, addToCurrentClassLoader=False): + """ + Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + If addToCurrentClassLoader is true, add the jar to the current threads' classloader. + In general adding to the current threads' class loader will impact all other application + threads unless they have explicitly changed their class loader. + + :param path: The path of the jar to be added + :param addToCurrentClassLoader: Whether to add the jar to the current driver classloader. + This defaults to False. + """ + self._jsc.sc().addJar(path, addToCurrentClassLoader) + def setCheckpointDir(self, dirName): """ Set the directory under which RDDs are going to be checkpointed. The diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index da99872da2f0e..fef969ea1088f 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -35,6 +35,7 @@ import hashlib from py4j.protocol import Py4JJavaError +from py4j.java_gateway import JavaClass try: import xmlrunner except ImportError: @@ -435,6 +436,19 @@ def test_add_file_locally(self): with open(download_path) as test_file: self.assertEqual("Hello World!\n", test_file.readline()) + def test_add_jar(self): + jvm = self.sc._jvm + # We shouldn't be able to load anything from the package before it is added + self.assertFalse(isinstance(jvm.pysparktests.DummyClass, JavaClass)) + # Generate and compile the test jar + destDir = os.path.join(SPARK_HOME, "python/test_support/jar") + jarName = jvm.org.apache.spark.TestUtils.createDummyJar( + destDir, "pysparktests", "DummyClass") + # Load the new jar + self.sc.addJar(jarName, True) + # Try and load the class + self.assertTrue(isinstance(jvm.pysparktests.DummyClass, JavaClass)) + def test_add_file_recursively_locally(self): path = os.path.join(SPARK_HOME, "python/test_support/hello") self.sc.addFile(path, True) From 3c321740f27ebe09dc387005d88cbe6630877e0b Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Sat, 18 Mar 2017 10:25:07 -0400 Subject: [PATCH 2/6] Addressed some review comments --- R/pkg/NAMESPACE | 1 + R/pkg/R/context.R | 6 ++---- .../src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 3fc756b9ef40c..20aab2330c626 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -409,6 +409,7 @@ export("as.DataFrame", "setCurrentDatabase", "spark.lapply", "spark.addFile", + "spark.addJar", "spark.getSparkFilesRootDirectory", "spark.getSparkFiles", "sql", diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index f3ab5670b40c0..d2c74b9aedce1 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -331,7 +331,6 @@ spark.addFile <- function(path, recursive = FALSE) { #' @rdname spark.addJar #' @param path The path of the jar to be added #' @param addToCurrentClassLoader Whether to add the jar to the current driver classloader. -#' Default is FALSE. #' @export #' @examples #'\dontrun{ @@ -339,10 +338,9 @@ spark.addFile <- function(path, recursive = FALSE) { #'} #' @note spark.addJar since 2.2.0 spark.addJar <- function(path, addToCurrentClassLoader = FALSE) { - sc <- getSparkContext() normalizedPath <- suppressWarnings(normalizePath(path)) - scala_sc <- callJMethod(sc, "sc") - invisible(callJMethod(scala_sc, "addJar", normalizedPath, addToCurrentClassLoader)) + sc <- callJMethod(getSparkContext(), "sc") + invisible(callJMethod(sc, "addJar", normalizedPath, addToCurrentClassLoader)) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a5003ab2cf8be..44aacd66c4734 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -317,7 +317,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } val packageName = s"scala_$schedulingMode" val className = "DummyClass" - val jarPath = TestUtils.createDummyJar(tempDir, packageName, className) + val jarURI = TestUtils.createDummyJar(tempDir, packageName, className) // ensure we reset the classloader after the test completes val originalClassLoader = Thread.currentThread.getContextClassLoader @@ -328,7 +328,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test(s"jar can be added and used driver side in $schedulingMode") { sc = new SparkContext(master, "test") Thread.currentThread().setContextClassLoader(loader) - sc.addJar(jarPath, addToCurrentClassLoader = true) + sc.addJar(jarURI, addToCurrentClassLoader = true) val cl = Utils.getContextOrSparkClassLoader cl.loadClass(s"$packageName.$className") } From a62367fd5d22dd8dc9f3a9ee0efd626355e684d7 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Sat, 18 Mar 2017 10:30:48 -0400 Subject: [PATCH 3/6] Addressed some review comments --- R/pkg/tests/fulltests/test_context.R | 2 +- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index 8e26544737b88..d6d0e7536b1f5 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -171,7 +171,7 @@ test_that("spark.lapply should perform simple transforms", { test_that("add jar should work and allow usage of the jar on the driver node", { sparkR.sparkContext() - destDir <- paste0(tempdir(), "/", "testjar") + destDir <- file.path(tempdir(), "testjar") jarName <- callJStatic("org.apache.spark.TestUtils", "createDummyJar", destDir, "sparkrTests", "DummyClassForAddJarTest") diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9aca6e9910601..6112dd483f7cf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1800,7 +1800,7 @@ class SparkContext(config: SparkConf) extends Logging { * If a jar is added during execution, it will not be available until the next TaskSet starts. * * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), - * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String): Unit = { addJar(path, false) @@ -1809,10 +1809,11 @@ class SparkContext(config: SparkConf) extends Logging { /** * Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future. * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), - * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. * @param addToCurrentClassLoader if true will add the jar to the current threads' classloader. - * In general adding to the current threads' class loader will impact all other application - * threads unless they have explicitly changed their class loader. + * In general adding to the current threads' class loader will + * impact all other application threads unless they have explicitly + * changed their class loader. */ @DeveloperApi def addJar(path: String, addToCurrentClassLoader: Boolean) { From b928ab82c54a72f91c01ba9d3418c80c58ab9da1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 2 Nov 2017 20:27:22 +0900 Subject: [PATCH 4/6] Address the rest of comments --- R/pkg/R/context.R | 13 +++----- R/pkg/tests/fulltests/test_context.R | 8 +++-- .../scala/org/apache/spark/SparkContext.scala | 30 ++++++++----------- .../scala/org/apache/spark/TestUtils.scala | 16 +++++----- .../org/apache/spark/SparkContextSuite.scala | 4 +-- .../spark/deploy/SparkSubmitSuite.scala | 6 ++-- python/pyspark/context.py | 8 ++--- python/pyspark/tests.py | 19 +++++++----- 8 files changed, 48 insertions(+), 56 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index d2c74b9aedce1..1fc5d0a7a0417 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -319,32 +319,27 @@ spark.addFile <- function(path, recursive = FALSE) { invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive)) } - -#' Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. +#' Adds a JAR dependency for Spark tasks to be executed in the future. #' #' The \code{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported #' filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. -#' If \code{addToCurrentClassLoader} is true, add the jar to the current threads' classloader. In -#' general adding to the current threads' class loader will impact all other application threads -#' unless they have explicitly changed their class loader. +#' If \code{addToCurrentClassLoader} is true, add the jar to the current driver. #' #' @rdname spark.addJar #' @param path The path of the jar to be added -#' @param addToCurrentClassLoader Whether to add the jar to the current driver classloader. +#' @param addToCurrentClassLoader Whether to add the jar to the current driver class loader. #' @export #' @examples #'\dontrun{ #' spark.addJar("/path/to/something.jar", TRUE) #'} -#' @note spark.addJar since 2.2.0 +#' @note spark.addJar since 2.3.0 spark.addJar <- function(path, addToCurrentClassLoader = FALSE) { normalizedPath <- suppressWarnings(normalizePath(path)) sc <- callJMethod(getSparkContext(), "sc") invisible(callJMethod(sc, "addJar", normalizedPath, addToCurrentClassLoader)) } - - #' Get the root directory that contains files added through spark.addFile. #' #' @rdname spark.getSparkFilesRootDirectory diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index d6d0e7536b1f5..9742296adb0a6 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -172,12 +172,14 @@ test_that("add jar should work and allow usage of the jar on the driver node", { sparkR.sparkContext() destDir <- file.path(tempdir(), "testjar") - jarName <- callJStatic("org.apache.spark.TestUtils", "createDummyJar", - destDir, "sparkrTests", "DummyClassForAddJarTest") + jarFile <- callJStatic("org.apache.spark.TestUtils", "createDummyJar", + destDir, "sparkrTests", "DummyClassForAddJarTest") + jarPath <- callJMethod(jarFile, "getAbsolutePath") - spark.addJar(jarName, addToCurrentClassLoader = TRUE) + spark.addJar(jarPath, addToCurrentClassLoader = TRUE) testClass <- newJObject("sparkrTests.DummyClassForAddJarTest") expect_true(class(testClass) == "jobj") + unlink(destDir) }) test_that("add and get file to be downloaded with Spark job on every node", { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6112dd483f7cf..647edbd53aacf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1800,23 +1800,23 @@ class SparkContext(config: SparkConf) extends Logging { * If a jar is added during execution, it will not be available until the next TaskSet starts. * * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), - * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String): Unit = { - addJar(path, false) + addJar(path, addToCurrentClassLoader = false) } /** * Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future. + * * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), - * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. * @param addToCurrentClassLoader if true will add the jar to the current threads' classloader. - * In general adding to the current threads' class loader will - * impact all other application threads unless they have explicitly - * changed their class loader. + * In general adding to the current threads' class loader will impact all other application + * threads unless they have explicitly changed their class loader. */ @DeveloperApi - def addJar(path: String, addToCurrentClassLoader: Boolean) { + def addJar(path: String, addToCurrentClassLoader: Boolean): Unit = { def addJarFile(file: File): String = { try { if (!file.exists()) { @@ -1852,6 +1852,7 @@ class SparkContext(config: SparkConf) extends Logging { case _ => path } } + if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { @@ -1860,17 +1861,10 @@ class SparkContext(config: SparkConf) extends Logging { } if (addToCurrentClassLoader) { - val currentCL = Utils.getContextOrSparkClassLoader - currentCL match { - case cl: MutableURLClassLoader => - val uri = if (path.contains("\\")) { - // For local paths with backslashes on Windows, URI throws an exception - new File(path).toURI - } else { - new URI(path) - } - cl.addURL(uri.toURL) - case _ => logWarning(s"Unsupported cl $currentCL will not update jars thread cl") + Utils.getContextOrSparkClassLoader match { + case cl: MutableURLClassLoader => cl.addURL(Utils.resolveURI(path).toURL) + case cl => logWarning( + s"Unsupported class loader $cl will not update jars in the thread class loader.") } } } diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 736c73c81b4ac..2878152dfa7de 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -169,26 +169,26 @@ private[spark] object TestUtils { } /** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */ - def createDummyJar(destDir: String, packageName: String, className: String): String = { + def createDummyJar(destDir: String, packageName: String, className: String): File = { val srcDir = new File(destDir, packageName) srcDir.mkdirs() - val excSource = new JavaSourceFromString(new File(srcDir, className).toURI.getPath, - s"""package $packageName; + val excSource = new JavaSourceFromString( + new File(srcDir, className).toURI.getPath, + s""" + |package $packageName; | |public class $className implements java.io.Serializable { | public static String helloWorld(String arg) { return "Hello " + arg; } | public static int addStuff(int arg1, int arg2) { return arg1 + arg2; } |} - """. - stripMargin) + """.stripMargin) val excFile = createCompiledClass(className, srcDir, excSource, Seq.empty) val jarFile = new File(destDir, s"$packageName-$className-%s.jar".format(System.currentTimeMillis())) - val jarURL = createJar(Seq(excFile), jarFile, directoryPrefix = Some(packageName)) - jarURL.toString + createJar(Seq(excFile), jarFile, directoryPrefix = Some(packageName)) + jarFile } - /** * Run some code involving jobs submitted to the given context and assert that the jobs spilled. */ diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 44aacd66c4734..7339049920a9d 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -317,7 +317,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } val packageName = s"scala_$schedulingMode" val className = "DummyClass" - val jarURI = TestUtils.createDummyJar(tempDir, packageName, className) + val jarURI = TestUtils.createDummyJar(tempDir, packageName, className).toURI // ensure we reset the classloader after the test completes val originalClassLoader = Thread.currentThread.getContextClassLoader @@ -328,7 +328,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test(s"jar can be added and used driver side in $schedulingMode") { sc = new SparkContext(master, "test") Thread.currentThread().setContextClassLoader(loader) - sc.addJar(jarURI, addToCurrentClassLoader = true) + sc.addJar(jarURI.toString, addToCurrentClassLoader = true) val cl = Utils.getContextOrSparkClassLoader cl.loadClass(s"$packageName.$className") } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index ed8809145b56d..e02dfb578fdcb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -550,14 +550,14 @@ class SparkSubmitSuite Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator) assert(new File(rScriptDir).exists) - val tempDir = Utils.createTempDir().toString + val tempDir = Utils.createTempDir().getAbsolutePath // compile a small jar containing a class that will be called from R code. - val jarURL = TestUtils.createDummyJar(tempDir, "sparkrtest", "DummyClass") + val jarURL = TestUtils.createDummyJar(tempDir, "sparkrtest", "DummyClass").toURI.toURL val args = Seq( "--name", "testApp", "--master", "local", - "--jars", jarURL, + "--jars", jarURL.toString, "--verbose", "--conf", "spark.ui.enabled=false", rScriptDir) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ec8a277e863d8..da7e1e8794376 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -862,15 +862,13 @@ def addPyFile(self, path): def addJar(self, path, addToCurrentClassLoader=False): """ - Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + Adds a JAR dependency for Spark tasks to be executed in the future. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. - If addToCurrentClassLoader is true, add the jar to the current threads' classloader. - In general adding to the current threads' class loader will impact all other application - threads unless they have explicitly changed their class loader. + If `addToCurrentClassLoader` is true, add the jar to the current driver. :param path: The path of the jar to be added - :param addToCurrentClassLoader: Whether to add the jar to the current driver classloader. + :param addToCurrentClassLoader: Whether to add the jar to the current driver class loader. This defaults to False. """ self._jsc.sc().addJar(path, addToCurrentClassLoader) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index fef969ea1088f..c7eb8b8992332 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -440,14 +440,17 @@ def test_add_jar(self): jvm = self.sc._jvm # We shouldn't be able to load anything from the package before it is added self.assertFalse(isinstance(jvm.pysparktests.DummyClass, JavaClass)) - # Generate and compile the test jar - destDir = os.path.join(SPARK_HOME, "python/test_support/jar") - jarName = jvm.org.apache.spark.TestUtils.createDummyJar( - destDir, "pysparktests", "DummyClass") - # Load the new jar - self.sc.addJar(jarName, True) - # Try and load the class - self.assertTrue(isinstance(jvm.pysparktests.DummyClass, JavaClass)) + try: + # Generate and compile the test jar + destDir = tempfile.mkdtemp() + jarPath = jvm.org.apache.spark.TestUtils.createDummyJar( + destDir, "pysparktests", "DummyClass").getAbsolutePath() + # Load the new jar + self.sc.addJar(jarPath, True) + # Try and load the class + self.assertTrue(isinstance(jvm.pysparktests.DummyClass, JavaClass)) + finally: + shutil.rmtree(destDir) def test_add_file_recursively_locally(self): path = os.path.join(SPARK_HOME, "python/test_support/hello") From f839240cc3b6047c9e4a21a93dc24c0b2a9018b6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 8 Nov 2017 20:17:33 +0900 Subject: [PATCH 5/6] Fix comments --- R/pkg/R/context.R | 7 ++++++- core/src/main/scala/org/apache/spark/TestUtils.scala | 2 +- python/pyspark/context.py | 8 ++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 1fc5d0a7a0417..9b9c2a1480a9c 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -323,7 +323,12 @@ spark.addFile <- function(path, recursive = FALSE) { #' #' The \code{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported #' filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. -#' If \code{addToCurrentClassLoader} is true, add the jar to the current driver. +#' If \code{addToCurrentClassLoader} is true, add the jar to the current threads' class loader +#' in the backing JVM. In general adding to the current threads' class loader will impact all +#' other application threads unless they have explicitly changed their class loader. +#' +#' Note: \code{addToCurrentClassLoader} parameter is a developer API, which change or be removed +#' in minor versions of Spark. #' #' @rdname spark.addJar #' @param path The path of the jar to be added diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 2878152dfa7de..867baab5a0f0d 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -168,7 +168,7 @@ private[spark] object TestUtils { createCompiledClass(className, destDir, sourceFile, classpathUrls) } - /** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */ + /** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */ def createDummyJar(destDir: String, packageName: String, className: String): File = { val srcDir = new File(destDir, packageName) srcDir.mkdirs() diff --git a/python/pyspark/context.py b/python/pyspark/context.py index da7e1e8794376..55c601b982028 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -865,11 +865,15 @@ def addJar(self, path, addToCurrentClassLoader=False): Adds a JAR dependency for Spark tasks to be executed in the future. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. - If `addToCurrentClassLoader` is true, add the jar to the current driver. + If `addToCurrentClassLoader` is true, add the jar to the current threads' class loader + in the backing JVM. In general adding to the current threads' class loader will impact all + other application threads unless they have explicitly changed their class loader. + + .. note:: `addToCurrentClassLoader` parameter is a developer API, which change or be removed + in minor versions of Spark. :param path: The path of the jar to be added :param addToCurrentClassLoader: Whether to add the jar to the current driver class loader. - This defaults to False. """ self._jsc.sc().addJar(path, addToCurrentClassLoader) From ab5280944d2c82debed68c4df1c1684bb0db88c6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 8 Nov 2017 20:20:33 +0900 Subject: [PATCH 6/6] classloader -> class loader --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 647edbd53aacf..b6257c8a7a6ba 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1811,7 +1811,7 @@ class SparkContext(config: SparkConf) extends Logging { * * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. - * @param addToCurrentClassLoader if true will add the jar to the current threads' classloader. + * @param addToCurrentClassLoader if true will add the jar to the current threads' class loader. * In general adding to the current threads' class loader will impact all other application * threads unless they have explicitly changed their class loader. */