From bb918144e2671193970a978241f2334107c90fe4 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Thu, 21 Nov 2019 20:21:15 -0600 Subject: [PATCH 1/7] add wildcard location --- .../apache/spark/scheduler/TaskLocation.scala | 14 +++++- .../spark/scheduler/TaskSetManager.scala | 47 ++++++++++--------- .../scheduler/TaskSchedulerImplSuite.scala | 38 +++++++++++++++ 3 files changed, 77 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 06b52935c696c..797e9c7bf767a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -49,6 +49,16 @@ private [spark] case class HDFSCacheTaskLocation(override val host: String) exte override def toString: String = TaskLocation.inMemoryLocationTag + host } +/** + * A location that can match any host. This can be used as the last location in the list of + * preferred locations to indicate that the task can be assigned to any host if it cannot get any + * desired location immediately. + */ +private [spark] case class WildcardLocation() extends TaskLocation { + override val host: String = "*" + override def toString: String = host +} + private[spark] object TaskLocation { // We identify hosts on which the block is cached with this prefix. Because this prefix contains // underscores, which are not legal characters in hostnames, there should be no potential for @@ -70,7 +80,9 @@ private[spark] object TaskLocation { def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { - if (str.startsWith(executorLocationTag)) { + if (str == "*") { + new WildcardLocation() + } else if (str.startsWith(executorLocationTag)) { val hostAndExecutorId = str.stripPrefix(executorLocationTag) val splits = hostAndExecutorId.split("_", 2) require(splits.length == 2, "Illegal executor location format: " + str) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c0bc497dd1b3..22e10467eaca8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -218,28 +218,33 @@ private[spark] class TaskSetManager( speculatable: Boolean = false): Unit = { val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks for (loc <- tasks(index).preferredLocations) { - loc match { - case e: ExecutorCacheTaskLocation => - pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index - case e: HDFSCacheTaskLocation => - val exe = sched.getExecutorsAliveOnHost(loc.host) - exe match { - case Some(set) => - for (e <- set) { - pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index - } - logInfo(s"Pending task $index has a cached location at ${e.host} " + - ", where there are executors " + set.mkString(",")) - case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") - } - case _ => - } - pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + if (loc.isInstanceOf[WildcardLocation]) { + pendingTaskSetToAddTo.noPrefs += index + } else { + loc match { + case e: ExecutorCacheTaskLocation => + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += + index + case e: HDFSCacheTaskLocation => + val exe = sched.getExecutorsAliveOnHost(loc.host) + exe match { + case Some(set) => + for (e <- set) { + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + } + logInfo(s"Pending task $index has a cached location at ${e.host} " + + ", where there are executors " + set.mkString(",")) + case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + + ", but there are no executors alive there.") + } + case _ => + } + pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - if (resolveRacks) { - sched.getRackForHost(loc.host).foreach { rack => - pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + if (resolveRacks) { + sched.getRackForHost(loc.host).foreach { rack => + pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e7ecf847ff4f4..307a22e7ad0b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1289,6 +1289,44 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.addresses) } + test("Tasks with wildcard location can run immediately if preferred location not available") { + val conf = new SparkConf() + .set(config.LOCALITY_WAIT.key, "3s") + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + + // we create a manual clock just so we can be sure the clock doesn't advance at all in this test + val clock = new ManualClock() + val taskScheduler = new TaskSchedulerImpl(sc) { + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } + } + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} + override def executorAdded(execId: String, host: String): Unit = {} + } + taskScheduler.initialize(new FakeSchedulerBackend) + // make an offer on the preferred host so the scheduler knows its alive. This is necessary + // so that the taskset knows that it *could* take advantage of locality. + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) + + // Submit a taskset with locality preferences. + val taskSet = FakeTask.createTaskSet( + 1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1"), TaskLocation("*"))) + taskScheduler.submitTasks(taskSet) + val tsm = taskScheduler.taskSetManagerForAttempt(1, 0).get + // make sure we've setup our test correctly, so that the taskset knows it *could* use local + // offers. + assert(tsm.myLocalityLevels.contains(TaskLocality.NODE_LOCAL)) + // make an offer on a non-preferred location. Since the delay is 0, we should still schedule + // immediately. + val taskDescs = + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec2", "host2", 1))).flatten + assert(taskDescs.size === 1) + assert(taskDescs.head.executorId === "exec2") + } + /** * Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure * that all the state is updated when this method returns. Otherwise, there's no way to know when From b64044899f0ccc2f56b6bed8b85422c857f4e56e Mon Sep 17 00:00:00 2001 From: maryannxue Date: Mon, 25 Nov 2019 10:39:49 -0600 Subject: [PATCH 2/7] improve test case --- .../scheduler/TaskSchedulerImplSuite.scala | 38 ------------------- .../spark/scheduler/TaskSetManagerSuite.scala | 18 +++++++++ 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 307a22e7ad0b4..e7ecf847ff4f4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1289,44 +1289,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.addresses) } - test("Tasks with wildcard location can run immediately if preferred location not available") { - val conf = new SparkConf() - .set(config.LOCALITY_WAIT.key, "3s") - sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) - - // we create a manual clock just so we can be sure the clock doesn't advance at all in this test - val clock = new ManualClock() - val taskScheduler = new TaskSchedulerImpl(sc) { - override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) - } - } - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} - override def executorAdded(execId: String, host: String): Unit = {} - } - taskScheduler.initialize(new FakeSchedulerBackend) - // make an offer on the preferred host so the scheduler knows its alive. This is necessary - // so that the taskset knows that it *could* take advantage of locality. - taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) - - // Submit a taskset with locality preferences. - val taskSet = FakeTask.createTaskSet( - 1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1"), TaskLocation("*"))) - taskScheduler.submitTasks(taskSet) - val tsm = taskScheduler.taskSetManagerForAttempt(1, 0).get - // make sure we've setup our test correctly, so that the taskset knows it *could* use local - // offers. - assert(tsm.myLocalityLevels.contains(TaskLocality.NODE_LOCAL)) - // make an offer on a non-preferred location. Since the delay is 0, we should still schedule - // immediately. - val taskDescs = - taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec2", "host2", 1))).flatten - assert(taskDescs.size === 1) - assert(taskDescs.head.executorId === "exec2") - } - /** * Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure * that all the state is updated when this method returns. Otherwise, there's no way to know when diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 34bcae8abd512..09767c3c98209 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1796,4 +1796,22 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.handleFailedTask(offerResult.get.taskId, TaskState.FAILED, reason) assert(sched.taskSetsFailed.contains(taskSet.id)) } + + test("Tasks with wildcard location can run immediately if preferred location not available") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) + val taskSet = FakeTask.createTaskSet(3, + Seq(TaskLocation("host1"), TaskLocation("*")), + Seq(TaskLocation("host1"), TaskLocation("*")), + Seq(TaskLocation("host2"), TaskLocation("*")) + ) + val clock = new ManualClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index === 0) + // Second task is not scheduled as it does not satisfy locality level. + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index === 2) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isEmpty) + // Second task is scheduled immediately on a non-preferred host with NO_PREF locality level. + assert(manager.resourceOffer("exec3", "host3", NO_PREF).get.index === 1) + } } From 72a946cb5649a08a8bfc8de03924fd95349347e1 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Mon, 25 Nov 2019 10:48:13 -0600 Subject: [PATCH 3/7] address review comments --- .../main/scala/org/apache/spark/scheduler/TaskLocation.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 797e9c7bf767a..2fec089c6698f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -54,7 +54,7 @@ private [spark] case class HDFSCacheTaskLocation(override val host: String) exte * preferred locations to indicate that the task can be assigned to any host if it cannot get any * desired location immediately. */ -private [spark] case class WildcardLocation() extends TaskLocation { +private [spark] case object WildcardLocation extends TaskLocation { override val host: String = "*" override def toString: String = host } @@ -81,7 +81,7 @@ private[spark] object TaskLocation { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str == "*") { - new WildcardLocation() + WildcardLocation } else if (str.startsWith(executorLocationTag)) { val hostAndExecutorId = str.stripPrefix(executorLocationTag) val splits = hostAndExecutorId.split("_", 2) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 22e10467eaca8..55d45c59830b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -218,7 +218,7 @@ private[spark] class TaskSetManager( speculatable: Boolean = false): Unit = { val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks for (loc <- tasks(index).preferredLocations) { - if (loc.isInstanceOf[WildcardLocation]) { + if (loc == WildcardLocation) { pendingTaskSetToAddTo.noPrefs += index } else { loc match { From 906f413c6b85065b7651bf85d2e3c47ca25bd1e5 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 3 Dec 2019 14:04:10 -0600 Subject: [PATCH 4/7] Address review comments --- .../apache/spark/scheduler/DAGScheduler.scala | 28 +++++++++++++++---- .../apache/spark/scheduler/TaskLocation.scala | 13 +++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 6 ++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fe3a48440991a..b5a2231b696f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1136,11 +1136,11 @@ private[spark] class DAGScheduler( val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => - partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap + partitionsToCompute.map { id => (id, getPreferredLocsInternal(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) - (id, getPreferredLocs(stage.rdd, p)) + (id, getPreferredLocsInternal(stage.rdd, p)) }.toMap } } catch { @@ -1152,7 +1152,8 @@ private[spark] class DAGScheduler( return } - stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) + val taskLocalityPrefs = taskIdToLocations.values.map(_.filter(_ != WildcardLocation)).toSeq + stage.makeNewStageAttempt(partitionsToCompute.size, taskLocalityPrefs) // If there are tasks to execute, record the submission time of the stage. Otherwise, // post the even without the submission time, which indicates that this stage was @@ -2054,7 +2055,7 @@ private[spark] class DAGScheduler( /** * Gets the locality information associated with a partition of a particular RDD. * - * This method is thread-safe and is called from both DAGScheduler and SparkContext. + * This method is thread-safe and is called from SparkContext. * * @param rdd whose partitions are to be looked at * @param partition to lookup locality information for @@ -2062,6 +2063,20 @@ private[spark] class DAGScheduler( */ private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { + getPreferredLocsInternal(rdd, partition).filter(_ != WildcardLocation) + } + + /** + * Gets the locality information associated with a partition of a particular RDD, which may + * include a [[WildcardLocation]]. + * + * This method is thread-safe and is called from DAGScheduler only. + * + * @param rdd whose partitions are to be looked at + * @param partition to lookup locality information for + * @return list of machines that are preferred by the partition + */ + private def getPreferredLocsInternal(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { getPreferredLocsInternal(rdd, partition, new HashSet) } @@ -2090,7 +2105,10 @@ private[spark] class DAGScheduler( // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { - return rddPrefs.map(TaskLocation(_)) + return rddPrefs.map { + case WildcardLocation.host => WildcardLocation + case host => TaskLocation(host) + } } // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 2fec089c6698f..646511de826cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -50,9 +50,12 @@ private [spark] case class HDFSCacheTaskLocation(override val host: String) exte } /** - * A location that can match any host. This can be used as the last location in the list of - * preferred locations to indicate that the task can be assigned to any host if it cannot get any - * desired location immediately. + * A location that can match any host. This can be used as one of the locations in + * [[org.apache.spark.rdd.RDD.getPreferredLocations]] to indicate that the task can be assigned to + * any host if none of the other desired locations can be satisfied immediately. + * + * Note that this location is only used to skip delayed scheduling in DAGScheduler, and + * [[DAGScheduler.getPreferredLocs]] does not contain this location. */ private [spark] case object WildcardLocation extends TaskLocation { override val host: String = "*" @@ -80,9 +83,7 @@ private[spark] object TaskLocation { def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { - if (str == "*") { - WildcardLocation - } else if (str.startsWith(executorLocationTag)) { + if (str.startsWith(executorLocationTag)) { val hostAndExecutorId = str.stripPrefix(executorLocationTag) val splits = hostAndExecutorId.split("_", 2) require(splits.length == 2, "Illegal executor location format: " + str) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 09767c3c98209..5a4c6afb417bb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1801,9 +1801,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = FakeTask.createTaskSet(3, - Seq(TaskLocation("host1"), TaskLocation("*")), - Seq(TaskLocation("host1"), TaskLocation("*")), - Seq(TaskLocation("host2"), TaskLocation("*")) + Seq(TaskLocation("host1"), WildcardLocation), + Seq(TaskLocation("host1"), WildcardLocation), + Seq(TaskLocation("host2"), WildcardLocation) ) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) From 674fd450fe6cee20ff77dafdf4b96e4e97c674d2 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 3 Dec 2019 14:16:26 -0600 Subject: [PATCH 5/7] code refine --- .../spark/scheduler/TaskSetManager.scala | 52 +++++++++---------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 55d45c59830b5..b3e0ecf8f1bf6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -217,39 +217,35 @@ private[spark] class TaskSetManager( resolveRacks: Boolean = true, speculatable: Boolean = false): Unit = { val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks - for (loc <- tasks(index).preferredLocations) { - if (loc == WildcardLocation) { - pendingTaskSetToAddTo.noPrefs += index - } else { - loc match { - case e: ExecutorCacheTaskLocation => - pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += - index - case e: HDFSCacheTaskLocation => - val exe = sched.getExecutorsAliveOnHost(loc.host) - exe match { - case Some(set) => - for (e <- set) { - pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index - } - logInfo(s"Pending task $index has a cached location at ${e.host} " + - ", where there are executors " + set.mkString(",")) - case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") - } - case _ => - } - pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - - if (resolveRacks) { - sched.getRackForHost(loc.host).foreach { rack => - pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + val preferredLocations = tasks(index).preferredLocations + for (loc <- preferredLocations.filter(_ != WildcardLocation)) { + loc match { + case e: ExecutorCacheTaskLocation => + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index + case e: HDFSCacheTaskLocation => + val exe = sched.getExecutorsAliveOnHost(loc.host) + exe match { + case Some(set) => + for (e <- set) { + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + } + logInfo(s"Pending task $index has a cached location at ${e.host} " + + ", where there are executors " + set.mkString(",")) + case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + + ", but there are no executors alive there.") } + case _ => + } + pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + + if (resolveRacks) { + sched.getRackForHost(loc.host).foreach { rack => + pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index } } } - if (tasks(index).preferredLocations == Nil) { + if (preferredLocations == Nil || preferredLocations.contains(WildcardLocation)) { pendingTaskSetToAddTo.noPrefs += index } From 78a123a4c1a3de9fd1fad825e6f1710f478cd8b2 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 3 Dec 2019 14:24:55 -0600 Subject: [PATCH 6/7] refine code comment --- .../scala/org/apache/spark/scheduler/TaskLocation.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 646511de826cc..b0d925e5f4406 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -54,8 +54,11 @@ private [spark] case class HDFSCacheTaskLocation(override val host: String) exte * [[org.apache.spark.rdd.RDD.getPreferredLocations]] to indicate that the task can be assigned to * any host if none of the other desired locations can be satisfied immediately. * - * Note that this location is only used to skip delayed scheduling in DAGScheduler, and - * [[DAGScheduler.getPreferredLocs]] does not contain this location. + * This location is only used internally by DAGScheduler to skip delayed scheduling for individual + * RDDs. [[DAGScheduler.getPreferredLocs]] does not contain this location. + * + * @note This class is experimental and may be replaced by a more complete solution for delayed + * scheduling. */ private [spark] case object WildcardLocation extends TaskLocation { override val host: String = "*" From e821543aba19be1ebc66242cc768cf04c60300f6 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 3 Dec 2019 21:18:30 -0600 Subject: [PATCH 7/7] fix javadoc build error --- .../scala/org/apache/spark/scheduler/TaskLocation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index b0d925e5f4406..9f97daf2876aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -51,11 +51,11 @@ private [spark] case class HDFSCacheTaskLocation(override val host: String) exte /** * A location that can match any host. This can be used as one of the locations in - * [[org.apache.spark.rdd.RDD.getPreferredLocations]] to indicate that the task can be assigned to - * any host if none of the other desired locations can be satisfied immediately. + * `RDD.getPreferredLocations` to indicate that the task can be assigned to any host if none of + * the other desired locations can be satisfied immediately. * * This location is only used internally by DAGScheduler to skip delayed scheduling for individual - * RDDs. [[DAGScheduler.getPreferredLocs]] does not contain this location. + * RDDs. `DAGScheduler.getPreferredLocs` does not contain this location. * * @note This class is experimental and may be replaced by a more complete solution for delayed * scheduling.