Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -552,17 +552,10 @@ class SparkContext(config: SparkConf) extends Logging {
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
val jobConf = new JobConf(hadoopConfiguration)
FileInputFormat.setInputPaths(jobConf, path)
hadoopRDD(jobConf, inputFormatClass, keyClass, valueClass, minPartitions)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if there was ever a reason we broadcasted the Configuration and this lambda, just to turn it into a JobConf on the Executor side? Was it really just extra work for no reason?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. See my comment below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no extra reason for broascast Configuration. I think it is because the old api here Instantiate HadoopRDD directly and HadoopRDD's construct method need broadcastedConf: Broadcast[SerializableWritable[Configuration]], so here broadcasted the Configuration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is. See my comment below.


}

/**
Expand Down
26 changes: 8 additions & 18 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* creates.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
Expand All @@ -93,7 +93,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
@DeveloperApi
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
broadcastedConf: Broadcast[SerializableWritable[JobConf]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
Expand All @@ -110,8 +110,7 @@ class HadoopRDD[K, V](
minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
sc.broadcast(new SerializableWritable(conf)),
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
Expand All @@ -128,25 +127,16 @@ class HadoopRDD[K, V](

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
val conf: JobConf = broadcastedConf.value.value
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this check? I think this may have been the only place we put the JobConf inside the cache.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, there is no need to cache the jobconf if it is in broadcast

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,i agree with you. broadcastedConf is cached by blockManager in Broadcast

// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
conf.synchronized {
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
initLocalJobConfFuncOpt.synchronized {
initLocalJobConfFuncOpt.map(f => f(conf))
}
conf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon

private val _broadcastedHiveConf =
sc.sparkContext.broadcast(new SerializableWritable(sc.hiveconf))

def broadcastedHiveConf = _broadcastedHiveConf

def hiveConf = _broadcastedHiveConf.value.value

private val _broadcastedJobConf =
sc.sparkContext.broadcast(new SerializableWritable(new JobConf(sc.hiveconf.
asInstanceOf[Configuration])))
override def makeRDDForTable(hiveTable: HiveTable): RDD[_] =
makeRDDForTable(
hiveTable,
Expand Down Expand Up @@ -208,10 +206,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = {

val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _

val rdd = new HadoopRDD(
sc.sparkContext,
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
_broadcastedJobConf.asInstanceOf[Broadcast[SerializableWritable[JobConf]]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
Expand Down