From c691f3d1cdb770aaafbb025a52c49233cb13c9f7 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Sat, 12 Jul 2014 15:59:04 +0800 Subject: [PATCH 1/9] use SparkContext.hadoopRDD() instead of instantiate HadoopRDD directly in SparkContext.hadoopFile --- .../scala/org/apache/spark/SparkContext.scala | 15 +++-------- .../org/apache/spark/rdd/HadoopRDD.scala | 27 ++++--------------- .../apache/spark/sql/hive/TableReader.scala | 19 +++---------- 3 files changed, 13 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8819e73d17fb2..28864f2620411 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -552,17 +552,10 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) - val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) - new HadoopRDD( - this, - confBroadcast, - Some(setInputPathsFunc), - inputFormatClass, - keyClass, - valueClass, - minPartitions).setName(path) + val jobConf = new JobConf(hadoopConfiguration) + FileInputFormat.setInputPaths(jobConf, path) + hadoopRDD(jobConf, inputFormatClass, keyClass, valueClass, minPartitions) + } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 041028514399b..cb22d340ca3a9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -83,8 +83,6 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. - * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD - * creates. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. @@ -93,8 +91,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - initLocalJobConfFuncOpt: Option[JobConf => Unit], + broadcastedConf: Broadcast[SerializableWritable[JobConf]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @@ -110,9 +107,7 @@ class HadoopRDD[K, V]( minPartitions: Int) = { this( sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - None /* initLocalJobConfFuncOpt */, + sc.broadcast(new SerializableWritable(conf)), inputFormatClass, keyClass, valueClass, @@ -128,25 +123,13 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value - if (conf.isInstanceOf[JobConf]) { - // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + val conf: JobConf = broadcastedConf.value.value + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - conf.synchronized { - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf - } + conf } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8cfde46186ca4..9a814825f2bc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -59,10 +59,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf)) - def broadcastedHiveConf = _broadcastedHiveConf - - def hiveConf = _broadcastedHiveConf.value.value - override def makeRDDForTable(hiveTable: HiveTable): RDD[_] = makeRDDForTable( hiveTable, @@ -206,17 +202,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon tableDesc: TableDesc, path: String, inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { - - val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ - - val rdd = new HadoopRDD( - sc.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], - Some(initializeJobConfFunc), - inputFormatClass, - classOf[Writable], - classOf[Writable], - _minSplitsPerRDD) + val jobConf = new JobConf(_broadcastedHiveConf.value.value.asInstanceOf[Configuration]) + HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(jobConf) + val rdd = sc.sparkContext.hadoopRDD(jobConf, inputFormatClass, classOf[Writable], + classOf[Writable],_minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) From 94b3fa2e01b9e46d8cef1c75d0196cd9b4ec75d0 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 16:55:46 +0800 Subject: [PATCH 2/9] use one broadcast for hive partitioning --- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +++++- .../apache/spark/sql/hive/TableReader.scala | 18 +++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index cb22d340ca3a9..6c78bbd289b01 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -92,6 +92,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[JobConf]], + initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @@ -108,6 +109,7 @@ class HadoopRDD[K, V]( this( sc, sc.broadcast(new SerializableWritable(conf)), + None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, valueClass, @@ -129,7 +131,9 @@ class HadoopRDD[K, V]( // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - conf + conf.synchronized { + initLocalJobConfFuncOpt.map(f => f(conf)) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 9a814825f2bc0..c1ef84059b6a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -58,7 +58,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf)) - + private val _broadcastedJobConf = + sc.sparkContext.broadcast(new SerializableWritable(new JobConf(sc.hiveconf. + asInstanceOf[Configuration]))) override def makeRDDForTable(hiveTable: HiveTable): RDD[_] = makeRDDForTable( hiveTable, @@ -202,10 +204,16 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon tableDesc: TableDesc, path: String, inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { - val jobConf = new JobConf(_broadcastedHiveConf.value.value.asInstanceOf[Configuration]) - HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(jobConf) - val rdd = sc.sparkContext.hadoopRDD(jobConf, inputFormatClass, classOf[Writable], - classOf[Writable],_minSplitsPerRDD) + + val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + val rdd = new HadoopRDD( + sc.sparkContext, + _broadcastedJobConf.asInstanceOf[Broadcast[SerializableWritable[JobConf]]], + Some(initializeJobConfFunc), + inputFormatClass, + classOf[Writable], + classOf[Writable], + _minSplitsPerRDD) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) From 5d8265b08c6559bc7b7a3c44e312690ad33ff214 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 20:44:56 +0800 Subject: [PATCH 3/9] fix return bug --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6c78bbd289b01..1fe875633dc8a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -134,6 +134,7 @@ class HadoopRDD[K, V]( conf.synchronized { initLocalJobConfFuncOpt.map(f => f(conf)) } + conf } } From 754f68bc6e947bb1e086ec5fba42d0c3c5bcb588 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 21:12:52 +0800 Subject: [PATCH 4/9] sync initLocalJobConfFuncOpt --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 1fe875633dc8a..e13bd005eeab8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -131,7 +131,7 @@ class HadoopRDD[K, V]( // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - conf.synchronized { + initLocalJobConfFuncOpt.synchronized { initLocalJobConfFuncOpt.map(f => f(conf)) } conf From 1f0b4ecdfd06b75b9d8531e141db95700cc523f0 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 21:40:28 +0800 Subject: [PATCH 5/9] first get jobConfFunc --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e13bd005eeab8..1b71feddd6c5d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,13 +126,15 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: JobConf = broadcastedConf.value.value + val f: (JobConf => Unit) = {} + val jobConfFunc = initLocalJobConfFuncOpt.getOrElse(f) if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - initLocalJobConfFuncOpt.synchronized { - initLocalJobConfFuncOpt.map(f => f(conf)) + conf.synchronized { + jobConfFunc(conf) } conf } From 659df51a1239cee34a32bcdba7227fbb74a8b06a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 21:43:35 +0800 Subject: [PATCH 6/9] fix f --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 1b71feddd6c5d..c276f6271186c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: JobConf = broadcastedConf.value.value - val f: (JobConf => Unit) = {} + def f(jc: JobConf) = {} val jobConfFunc = initLocalJobConfFuncOpt.getOrElse(f) if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf From 9967588859ce7227c39d8dd51f97a35673d09acd Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 21:46:02 +0800 Subject: [PATCH 7/9] fix fun --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index c276f6271186c..ce34832d6035d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -127,7 +127,7 @@ class HadoopRDD[K, V]( protected def getJobConf(): JobConf = { val conf: JobConf = broadcastedConf.value.value def f(jc: JobConf) = {} - val jobConfFunc = initLocalJobConfFuncOpt.getOrElse(f) + val jobConfFunc = initLocalJobConfFuncOpt.getOrElse(f _) if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. From afbaae7b73d9959eaea167bf106ceb85752aeef5 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 14 Jul 2014 22:11:18 +0800 Subject: [PATCH 8/9] add notation --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index ce34832d6035d..4967fb9d9dab8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -83,6 +83,8 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. + * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD + * creates. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. From 1c85a3a3bd09f5a5c6bc7da8571392d38a93e669 Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 14 Jul 2014 22:21:59 +0800 Subject: [PATCH 9/9] =?UTF-8?q?bad=20commit=EF=BC=8C=20return=20back?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 4967fb9d9dab8..ca77891ef5bec 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -84,7 +84,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD - * creates. + * creates. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. @@ -128,15 +128,13 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: JobConf = broadcastedConf.value.value - def f(jc: JobConf) = {} - val jobConfFunc = initLocalJobConfFuncOpt.getOrElse(f _) if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - conf.synchronized { - jobConfFunc(conf) + initLocalJobConfFuncOpt.synchronized { + initLocalJobConfFuncOpt.map(f => f(conf)) } conf }