diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8819e73d17fb2..28864f2620411 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -552,17 +552,10 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) - val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) - new HadoopRDD( - this, - confBroadcast, - Some(setInputPathsFunc), - inputFormatClass, - keyClass, - valueClass, - minPartitions).setName(path) + val jobConf = new JobConf(hadoopConfiguration) + FileInputFormat.setInputPaths(jobConf, path) + hadoopRDD(jobConf, inputFormatClass, keyClass, valueClass, minPartitions) + } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 041028514399b..ca77891ef5bec 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -84,7 +84,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD - * creates. + * creates. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. @@ -93,7 +93,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + broadcastedConf: Broadcast[SerializableWritable[JobConf]], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], @@ -110,8 +110,7 @@ class HadoopRDD[K, V]( minPartitions: Int) = { this( sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + sc.broadcast(new SerializableWritable(conf)), None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, @@ -128,25 +127,16 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value - if (conf.isInstanceOf[JobConf]) { - // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + val conf: JobConf = broadcastedConf.value.value + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - conf.synchronized { - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf + initLocalJobConfFuncOpt.synchronized { + initLocalJobConfFuncOpt.map(f => f(conf)) } + conf } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8cfde46186ca4..c1ef84059b6a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -58,11 +58,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf)) - - def broadcastedHiveConf = _broadcastedHiveConf - - def hiveConf = _broadcastedHiveConf.value.value - + private val _broadcastedJobConf = + sc.sparkContext.broadcast(new SerializableWritable(new JobConf(sc.hiveconf. + asInstanceOf[Configuration]))) override def makeRDDForTable(hiveTable: HiveTable): RDD[_] = makeRDDForTable( hiveTable, @@ -208,10 +206,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ - val rdd = new HadoopRDD( sc.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + _broadcastedJobConf.asInstanceOf[Broadcast[SerializableWritable[JobConf]]], Some(initializeJobConfFunc), inputFormatClass, classOf[Writable],