From 2590b59b54d733111b896c3b327213da71912e76 Mon Sep 17 00:00:00 2001 From: Jesse Cai Date: Thu, 11 Jul 2019 15:40:21 -0700 Subject: [PATCH 1/5] Add UDF threshold conf and replace in _prepare_for_python_RDD --- .../scala/org/apache/spark/api/python/PythonUtils.scala | 4 ++++ .../scala/org/apache/spark/internal/config/package.scala | 6 ++++++ python/pyspark/rdd.py | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index eee6e4b28ac47..2ebf0d452f620 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -81,4 +81,8 @@ private[spark] object PythonUtils { def isEncryptionEnabled(sc: JavaSparkContext): Boolean = { sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED) } + + def getBroadcastThreshold(sc: JavaSparkContext): Long = { + sc.conf.get(org.apache.spark.internal.config.BROADCAST_UDF_THRESHOLD) + } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 488886f1627f2..7507b369dae48 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1246,6 +1246,12 @@ package object config { "mechanisms to guarantee data won't be corrupted during broadcast") .booleanConf.createWithDefault(true) + private[spark] val BROADCAST_UDF_THRESHOLD = ConfigBuilder("spark.broadcast.UDFThreshold") + .doc("The threshold at which a serialized command is compressed by broadcast, in " + + "bytes unless otherwise specified") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1 << 20) + private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress") .doc("Whether to compress serialized RDD partitions " + "(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " + diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8bcc67ab1c3e6..96fdf5f33b39d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2490,7 +2490,7 @@ def _prepare_for_python_RDD(sc, command): # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() pickled_command = ser.dumps(command) - if len(pickled_command) > (1 << 20): # 1M + if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M # The broadcast will have same life cycle as created PythonRDD broadcast = sc.broadcast(pickled_command) pickled_command = ser.dumps(broadcast) From ca7de402cf6ba774db92f1ef995c58740bc8538c Mon Sep 17 00:00:00 2001 From: Jesse Cai Date: Thu, 11 Jul 2019 17:12:15 -0700 Subject: [PATCH 2/5] Address comments --- .../org/apache/spark/api/python/PythonUtils.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 2ebf0d452f620..62d60475985b3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -83,6 +83,6 @@ private[spark] object PythonUtils { } def getBroadcastThreshold(sc: JavaSparkContext): Long = { - sc.conf.get(org.apache.spark.internal.config.BROADCAST_UDF_THRESHOLD) + sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7507b369dae48..59df17974cc5c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1246,11 +1246,13 @@ package object config { "mechanisms to guarantee data won't be corrupted during broadcast") .booleanConf.createWithDefault(true) - private[spark] val BROADCAST_UDF_THRESHOLD = ConfigBuilder("spark.broadcast.UDFThreshold") - .doc("The threshold at which a serialized command is compressed by broadcast, in " + - "bytes unless otherwise specified") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(1 << 20) + private[spark] val BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD = + ConfigBuilder("spark.broadcast.UDFCompressionThreshold") + .doc("The threshold at which a a user-defined function (UDF) is compressed by broadcast, " + + "in bytes unless otherwise specified") + .bytesConf(ByteUnit.BYTE) + .checkValue(v => v >= 0, "The threshold should be non-negative.") + .createWithDefault(1L * 1024 * 1024) private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress") .doc("Whether to compress serialized RDD partitions " + From c716eb7c3082a3eb2059dd8fb424f992ab3c9691 Mon Sep 17 00:00:00 2001 From: Jesse Cai Date: Fri, 12 Jul 2019 08:58:21 -0700 Subject: [PATCH 3/5] Fix description --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 59df17974cc5c..76d3d6ee3d8f8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1248,8 +1248,8 @@ package object config { private[spark] val BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD = ConfigBuilder("spark.broadcast.UDFCompressionThreshold") - .doc("The threshold at which a a user-defined function (UDF) is compressed by broadcast, " + - "in bytes unless otherwise specified") + .doc("The threshold at which user-defined functions (UDFs) and Python RDD commands " + + "are compressed by broadcast in bytes unless otherwise specified") .bytesConf(ByteUnit.BYTE) .checkValue(v => v >= 0, "The threshold should be non-negative.") .createWithDefault(1L * 1024 * 1024) From f34d3bd0ac47fb2cac1d17660fbf48dc43ee2f45 Mon Sep 17 00:00:00 2001 From: Jesse Cai Date: Fri, 12 Jul 2019 16:05:17 -0700 Subject: [PATCH 4/5] Add test case --- .../test/scala/org/apache/spark/SparkConfSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 6be1fedc123d8..443244386e692 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -389,6 +389,16 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst """.stripMargin.trim) } + test("SPARK-28355: Use Spark conf for threshold at which UDFs are compressed by broadcast") { + val conf = new SparkConf() + + // Set the conf + conf.set(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD, 1L * 1024) + + // Verify that it has been set properly + assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024) + } + val defaultIllegalValue = "SomeIllegalValue" val illegalValueTests : Map[String, (SparkConf, String) => Any] = Map( "getTimeAsSeconds" -> (_.getTimeAsSeconds(_)), From 1ec3a4a338a6f5c5ba9056946ab9747fe7dc0a3d Mon Sep 17 00:00:00 2001 From: Jesse Cai Date: Fri, 12 Jul 2019 16:16:35 -0700 Subject: [PATCH 5/5] Add test for the default value --- core/src/test/scala/org/apache/spark/SparkConfSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 443244386e692..202b85dcf5695 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -392,6 +392,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("SPARK-28355: Use Spark conf for threshold at which UDFs are compressed by broadcast") { val conf = new SparkConf() + // Check the default value + assert(conf.get(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD) === 1L * 1024 * 1024) + // Set the conf conf.set(BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD, 1L * 1024)