From 4d95bf67ea865e98997160e8653775a606df73f6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 15 May 2019 14:43:22 -0700 Subject: [PATCH 1/8] [SPARK-27963][core] Allow dynamic allocation without a shuffle service. This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service. --- .../spark/ExecutorAllocationManager.scala | 16 +- .../scala/org/apache/spark/SparkContext.scala | 20 +- .../spark/internal/config/package.scala | 11 + .../apache/spark/scheduler/StageInfo.scala | 10 +- .../scheduler/dynalloc/ExecutorMonitor.scala | 217 +++++++++++++++++- .../ExecutorAllocationManagerSuite.scala | 2 +- .../dynalloc/ExecutorMonitorSuite.scala | 85 ++++++- 7 files changed, 332 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 63df7ccf46b7..6abff0a5ade5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager( client: ExecutorAllocationClient, listenerBus: LiveListenerBus, conf: SparkConf, + cleaner: Option[ContextCleaner] = None, clock: Clock = new SystemClock()) extends Logging { @@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager( // Listener for Spark events that impact the allocation policy val listener = new ExecutorAllocationListener - val executorMonitor = new ExecutorMonitor(conf, client, clock) + val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock) // Executor that handles the scheduling task. private val executor = @@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager( throw new SparkException( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } - // Require external shuffle service for dynamic allocation - // Otherwise, we may lose shuffle files when killing executors - if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { - throw new SparkException("Dynamic allocation of executors requires the external " + - "shuffle service. You may enable this through spark.shuffle.service.enabled.") + if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) { + logWarning("Dynamic allocation without a shuffle service is an experimental feature.") + } else { + throw new SparkException("Dynamic allocation of executors requires the external " + + "shuffle service. You may enable this through spark.shuffle.service.enabled.") + } } if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { @@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager( def start(): Unit = { listenerBus.addToManagementQueue(listener) listenerBus.addToManagementQueue(executorMonitor) + cleaner.foreach(_.attachListener(executorMonitor)) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66f8f4190f91..cc92f41d0321 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -578,14 +578,22 @@ class SparkContext(config: SparkConf) extends Logging { None } - // Optionally scale number of executors dynamically based on workload. Exposed for testing. + _cleaner = + if (_conf.get(CLEANER_REFERENCE_TRACKING)) { + Some(new ContextCleaner(this)) + } else { + None + } + _cleaner.foreach(_.start()) + val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( - schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) + schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, + cleaner = cleaner)) case _ => None } @@ -594,14 +602,6 @@ class SparkContext(config: SparkConf) extends Logging { } _executorAllocationManager.foreach(_.start()) - _cleaner = - if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this)) - } else { - None - } - _cleaner.foreach(_.start()) - setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 90826bb84981..269ba83af955 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -367,6 +367,17 @@ package object config { .checkValue(_ >= 0L, "Timeout must be >= 0.") .createWithDefault(60) + private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING = + ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val DYN_ALLOCATION_SHUFFLE_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.shuffleTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ >= 0L, "Timeout must be >= 0.") + .createWithDefault(Long.MaxValue) + private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT = ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout") .timeConf(TimeUnit.SECONDS).createWithDefault(1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 33a68f24bd53..e3216151462b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -37,7 +37,8 @@ class StageInfo( val parentIds: Seq[Int], val details: String, val taskMetrics: TaskMetrics = null, - private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { + private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, + private[spark] val shuffleDepId: Option[Int] = None) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None /** Time when all tasks in the stage completed or when the stage was cancelled. */ @@ -90,6 +91,10 @@ private[spark] object StageInfo { ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos + val shuffleDepId = stage match { + case sms: ShuffleMapStage => Option(sms.shuffleDep).map(_.shuffleId) + case _ => None + } new StageInfo( stage.id, attemptId, @@ -99,6 +104,7 @@ private[spark] object StageInfo { stage.parents.map(_.id), stage.details, taskMetrics, - taskLocalityPreferences) + taskLocalityPreferences, + shuffleDepId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 9aac4d2281ec..839fdea67390 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -36,14 +36,19 @@ import org.apache.spark.util.Clock private[spark] class ExecutorMonitor( conf: SparkConf, client: ExecutorAllocationClient, - clock: Clock) extends SparkListener with Logging { + listenerBus: LiveListenerBus, + clock: Clock) extends SparkListener with CleanerListener with Logging { private val idleTimeoutMs = TimeUnit.SECONDS.toMillis( conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)) private val storageTimeoutMs = TimeUnit.SECONDS.toMillis( conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) + private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT) + private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) && + conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING) private val executors = new ConcurrentHashMap[String, Tracker]() @@ -64,6 +69,26 @@ private[spark] class ExecutorMonitor( private val nextTimeout = new AtomicLong(Long.MaxValue) private var timedOutExecs = Seq.empty[String] + // Active job tracking. + // + // The following state is used when an external shuffle service is not in use, and allows Spark + // to scale down based on whether the shuffle data stored in executors is in use. + // + // The algorithm works as following: when jobs start, some state is kept that tracks which stages + // are part of that job, and which shuffle ID is attached to those stages. As tasks finish, the + // executor tracking code is updated to include the list of shuffles for which it's storing + // shuffle data. + // + // If executors hold shuffle data that is related to an active job, then the executor is + // considered to be in "shuffle busy" state; meaning that the executor is not allowed to be + // removed. If the executor has shuffle data but it doesn't relate to any active job, then it + // may be removed when idle, following the same timeout configuration used for cache blocks. + // + // The following fields are not thread-safe and should be only used from the event thread. + private val shuffleToActiveJobs = new mutable.HashMap[Int, mutable.ArrayBuffer[Int]]() + private val stageToShuffleID = new mutable.HashMap[Int, Int]() + private val jobToStageIDs = new mutable.HashMap[Int, Seq[Int]]() + def reset(): Unit = { executors.clear() nextTimeout.set(Long.MaxValue) @@ -85,7 +110,7 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval } + .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -124,6 +149,98 @@ private[spark] class ExecutorMonitor( def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval } + override def onJobStart(event: SparkListenerJobStart): Unit = { + if (!shuffleTrackingEnabled) { + return + } + + val shuffleStages = event.stageInfos.flatMap { s => + s.shuffleDepId.toSeq.map { shuffleId => + s.stageId -> shuffleId + } + } + + var updateExecutors = false + shuffleStages.foreach { case (sid, shuffle) => + val jobIDs = shuffleToActiveJobs.get(shuffle) match { + case Some(jobs) => + // If a shuffle is being re-used, we need to re-scan the executors and update their + // tracker with the information that the shuffle data they're storing is in use. + logDebug(s"Reusing shuffle $shuffle in job ${event.jobId}.") + updateExecutors = true + jobs + + case _ => + logDebug(s"Registered new shuffle $shuffle (from stage $sid).") + val jobs = new mutable.ArrayBuffer[Int]() + shuffleToActiveJobs(shuffle) = jobs + jobs + } + jobIDs += event.jobId + } + + if (updateExecutors) { + val active = shuffleStages.map(_._2).toSeq + var needTimeoutUpdate = false + executors.asScala.foreach { case (id, exec) => + if (!exec.hasActiveShuffle) { + exec.updateActiveShuffles(active) + if (exec.hasActiveShuffle) { + logDebug(s"Executor $id has data needed by new active job.") + needTimeoutUpdate = true + } + } + } + + if (needTimeoutUpdate) { + nextTimeout.set(Long.MinValue) + } + } + + stageToShuffleID ++= shuffleStages + jobToStageIDs(event.jobId) = shuffleStages.map(_._1).toSeq + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + if (!shuffleTrackingEnabled) { + return + } + + var updateExecutors = false + val activeShuffles = new mutable.ArrayBuffer[Int]() + shuffleToActiveJobs.foreach { case (shuffleId, jobs) => + jobs -= event.jobId + if (jobs.nonEmpty) { + activeShuffles += shuffleId + } else { + // If a shuffle went idle we need to update all executors to make sure they're correctly + // tracking active shuffles. + updateExecutors = true + } + } + + if (updateExecutors) { + if (log.isDebugEnabled()) { + if (activeShuffles.nonEmpty) { + logDebug( + s"Job ${event.jobId} ended, shuffles ${activeShuffles.mkString(",")} still active.") + } else { + logDebug(s"Job ${event.jobId} ended, no active shuffles remain.") + } + } + + executors.asScala.foreach { case (id, exec) => + if (exec.hasActiveShuffle) { + exec.updateActiveShuffles(activeShuffles) + } + } + } + + jobToStageIDs.remove(event.jobId).foreach { stages => + stages.foreach { id => stageToShuffleID -= id } + } + } + override def onTaskStart(event: SparkListenerTaskStart): Unit = { val executorId = event.taskInfo.executorId // Guard against a late arriving task start event (SPARK-26927). @@ -137,6 +254,21 @@ private[spark] class ExecutorMonitor( val executorId = event.taskInfo.executorId val exec = executors.get(executorId) if (exec != null) { + // If the task succeeded and the stage generates shuffle data, record that this executor + // holds data for the shuffle. Note that this ignores speculation, since this code is not + // directly tied to the map output tracker that knows exactly which shuffle blocks are + // being used. This means that an executor may be marked as having shuffle data even though + // it may not - although that's probably unlikely since it would require a single task for + // a stage to be run on that executor, and that particular task's output to not being used + // (because of another task for the same partition that ran somewhere else "winning"). + if (shuffleTrackingEnabled && event.reason == Success) { + stageToShuffleID.get(event.stageId).foreach { shuffleId => + exec.addShuffle(shuffleId) + } + } + + // Update the number of running tasks after checking for shuffle data, so that the shuffle + // information is up-to-date in case the executor is going idle. exec.updateRunningTasks(-1) } } @@ -171,7 +303,6 @@ private[spark] class ExecutorMonitor( // available. So don't count blocks that can be served by the external service. if (storageLevel.isValid && (!fetchFromShuffleSvcEnabled || !storageLevel.useDisk)) { val hadCachedBlocks = exec.cachedBlocks.nonEmpty - val blocks = exec.cachedBlocks.getOrElseUpdate(blockId.rddId, new mutable.BitSet(blockId.splitIndex)) blocks += blockId.splitIndex @@ -201,6 +332,25 @@ private[spark] class ExecutorMonitor( } } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case ShuffleCleanedEvent(id) => cleanupShuffle(id) + case _ => + } + + override def rddCleaned(rddId: Int): Unit = { } + + override def shuffleCleaned(shuffleId: Int): Unit = { + // Because this is called in a completely separate thread, we post a custom event to the + // listener bus so that the internal state is safely updated. + listenerBus.post(ShuffleCleanedEvent(shuffleId)) + } + + override def broadcastCleaned(broadcastId: Long): Unit = { } + + override def accumCleaned(accId: Long): Unit = { } + + override def checkpointCleaned(rddId: Long): Unit = { } + // Visible for testing. private[dynalloc] def isExecutorIdle(id: String): Boolean = { Option(executors.get(id)).map(_.isIdle).getOrElse(throw new NoSuchElementException(id)) @@ -209,7 +359,7 @@ private[spark] class ExecutorMonitor( // Visible for testing private[dynalloc] def timedOutExecutors(when: Long): Seq[String] = { executors.asScala.flatMap { case (id, tracker) => - if (tracker.timeoutAt <= when) Some(id) else None + if (tracker.isIdle && tracker.timeoutAt <= when) Some(id) else None }.toSeq } @@ -236,6 +386,14 @@ private[spark] class ExecutorMonitor( } } + private def cleanupShuffle(id: Int): Unit = { + logDebug(s"Cleaning up state related to shuffle $id.") + shuffleToActiveJobs -= id + executors.asScala.foreach { case (_, exec) => + exec.removeShuffle(id) + } + } + private class Tracker { @volatile var timeoutAt: Long = Long.MaxValue @@ -244,6 +402,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 private var runningTasks: Int = 0 @@ -252,8 +411,11 @@ private[spark] class ExecutorMonitor( // This should only be used in the event thread. val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]() - // For testing. - def isIdle: Boolean = idleStart >= 0 + // The set of shuffles for which shuffle data is held by the executor. + // This should only be used in the event thread. + private val shuffleIds = if (shuffleTrackingEnabled) new mutable.HashSet[Int]() else null + + def isIdle: Boolean = idleStart >= 0 && !hasActiveShuffle def updateRunningTasks(delta: Int): Unit = { runningTasks = math.max(0, runningTasks + delta) @@ -264,7 +426,18 @@ private[spark] class ExecutorMonitor( def updateTimeout(): Unit = { val oldDeadline = timeoutAt val newDeadline = if (idleStart >= 0) { - idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else idleTimeoutMs) + val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) { + val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue + val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { + shuffleTimeoutMs + } else { + Long.MaxValue + } + math.min(_cacheTimeout, _shuffleTimeout) + } else { + idleTimeoutMs + } + idleStart + timeout } else { Long.MaxValue } @@ -279,5 +452,35 @@ private[spark] class ExecutorMonitor( updateNextTimeout(newDeadline) } } + + def addShuffle(id: Int): Unit = { + if (shuffleIds.add(id)) { + hasActiveShuffle = true + if (!isIdle) { + updateTimeout() + } + } + } + + def removeShuffle(id: Int): Unit = { + if (shuffleIds.remove(id) && shuffleIds.isEmpty) { + hasActiveShuffle = false + if (isIdle) { + updateTimeout() + } + } + } + + def updateActiveShuffles(ids: Iterable[Int]): Unit = { + val hadActiveShuffle = hasActiveShuffle + hasActiveShuffle = ids.exists(shuffleIds.contains) + if (hadActiveShuffle && isIdle) { + updateTimeout() + } + } + } + + private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent { + override protected[spark] def logEvent: Boolean = false } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 2b75f2e4030a..f79551f5da81 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -986,7 +986,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createManager( conf: SparkConf, clock: Clock = new SystemClock()): ExecutorAllocationManager = { - val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock) + val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock) managers += manager manager.start() manager diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 8d1577e835d2..d108d261dc8e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{doAnswer, mock, when} import org.apache.spark._ import org.apache.spark.internal.config._ @@ -34,10 +34,13 @@ class ExecutorMonitorSuite extends SparkFunSuite { private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L) private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L) + private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L) private val conf = new SparkConf() .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s") .set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s") + .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT.key, "240s") + .set(SHUFFLE_SERVICE_ENABLED, true) private var monitor: ExecutorMonitor = _ private var client: ExecutorAllocationClient = _ @@ -55,7 +58,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { when(client.isExecutorActive(any())).thenAnswer { invocation => knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String]) } - monitor = new ExecutorMonitor(conf, client, clock) + monitor = new ExecutorMonitor(conf, client, null, clock) } test("basic executor timeout") { @@ -205,7 +208,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(storageDeadline) === Seq("1")) conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) - monitor = new ExecutorMonitor(conf, client, clock) + monitor = new ExecutorMonitor(conf, client, null, clock) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY)) @@ -259,8 +262,84 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors().toSet === Set("2")) } + test("shuffle block tracking") { + // Mock the listener bus *only* for the functionality needed by the shuffle tracking code. + // Any other event sent through the mock bus will fail. + val bus = mock(classOf[LiveListenerBus]) + doAnswer { invocation => + monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent]) + }.when(bus).post(any()) + + conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, bus, clock) + + // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle. + val stage1 = stageInfo(1, shuffleId = 0) + val stage2 = stageInfo(2) + + val stage3 = stageInfo(3, shuffleId = 1) + val stage4 = stageInfo(4) + + val stage5 = stageInfo(5, shuffleId = 1) + val stage6 = stageInfo(6) + + // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs. This should prevent the + // executor from going idle since there are active shuffles. + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2))) + monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4))) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + + // First a failed task, to make sure it does not count. + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + // Finish the jobs, now the executor should be idle, but with the shuffle timeout, since the + // shuffles are not active. + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + assert(!monitor.isExecutorIdle("1")) + + monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("1")) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + assert(monitor.timedOutExecutors(storageDeadline).isEmpty) + assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1")) + + // Start job 3. Since it shares a shuffle with job 2, the executor should not be considered + // idle anymore, even if no tasks are run. + monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(), Seq(stage5, stage6))) + assert(!monitor.isExecutorIdle("1")) + assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty) + + monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1")) + + // Clean up the shuffles, executor now should now time out at the idle deadline. + monitor.shuffleCleaned(0) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + monitor.shuffleCleaned(1) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + } + private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 + private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1 + + private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = { + new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "", + shuffleDepId = if (shuffleId >= 0) Some(shuffleId) else None) + } private def taskInfo( execId: String, From be37e740cab50464d68927d83ecce8078c853caa Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 7 Jun 2019 09:42:13 -0700 Subject: [PATCH 2/8] Fix unit tests. --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 6abff0a5ade5..6e25fbf7dac6 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -198,7 +198,7 @@ private[spark] class ExecutorAllocationManager( if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") - } else { + } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } From 8a34caec4305022ce96bd606b57c64c416ead64a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 24 Jun 2019 15:29:25 -0700 Subject: [PATCH 3/8] Fix comment. --- .../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 839fdea67390..04d0186d4273 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -82,7 +82,7 @@ private[spark] class ExecutorMonitor( // If executors hold shuffle data that is related to an active job, then the executor is // considered to be in "shuffle busy" state; meaning that the executor is not allowed to be // removed. If the executor has shuffle data but it doesn't relate to any active job, then it - // may be removed when idle, following the same timeout configuration used for cache blocks. + // may be removed when idle, following the shuffle-specific timeout configuration. // // The following fields are not thread-safe and should be only used from the event thread. private val shuffleToActiveJobs = new mutable.HashMap[Int, mutable.ArrayBuffer[Int]]() From bfb5d64254b2241f4eb77a9cf8e92eaf88ad6c78 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 24 Jun 2019 15:38:44 -0700 Subject: [PATCH 4/8] Feedback. --- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 04d0186d4273..ff29ab65921f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -161,7 +161,7 @@ private[spark] class ExecutorMonitor( } var updateExecutors = false - shuffleStages.foreach { case (sid, shuffle) => + shuffleStages.foreach { case (stageId, shuffle) => val jobIDs = shuffleToActiveJobs.get(shuffle) match { case Some(jobs) => // If a shuffle is being re-used, we need to re-scan the executors and update their @@ -171,7 +171,7 @@ private[spark] class ExecutorMonitor( jobs case _ => - logDebug(s"Registered new shuffle $shuffle (from stage $sid).") + logDebug(s"Registered new shuffle $shuffle (from stage $stageId).") val jobs = new mutable.ArrayBuffer[Int]() shuffleToActiveJobs(shuffle) = jobs jobs @@ -180,11 +180,11 @@ private[spark] class ExecutorMonitor( } if (updateExecutors) { - val active = shuffleStages.map(_._2).toSeq + val activeShuffleIds = shuffleStages.map(_._2).toSeq var needTimeoutUpdate = false executors.asScala.foreach { case (id, exec) => if (!exec.hasActiveShuffle) { - exec.updateActiveShuffles(active) + exec.updateActiveShuffles(activeShuffleIds) if (exec.hasActiveShuffle) { logDebug(s"Executor $id has data needed by new active job.") needTimeoutUpdate = true @@ -456,9 +456,6 @@ private[spark] class ExecutorMonitor( def addShuffle(id: Int): Unit = { if (shuffleIds.add(id)) { hasActiveShuffle = true - if (!isIdle) { - updateTimeout() - } } } From 9f88e2887525b48283ae000c709fa399b554a33c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Jun 2019 14:34:36 -0700 Subject: [PATCH 5/8] Clarify a comment. --- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index ff29ab65921f..cbc584cc4ade 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -255,12 +255,12 @@ private[spark] class ExecutorMonitor( val exec = executors.get(executorId) if (exec != null) { // If the task succeeded and the stage generates shuffle data, record that this executor - // holds data for the shuffle. Note that this ignores speculation, since this code is not - // directly tied to the map output tracker that knows exactly which shuffle blocks are - // being used. This means that an executor may be marked as having shuffle data even though - // it may not - although that's probably unlikely since it would require a single task for - // a stage to be run on that executor, and that particular task's output to not being used - // (because of another task for the same partition that ran somewhere else "winning"). + // holds data for the shuffle. This code will track all executors that generate shuffle + // for the stage, even if speculative tasks generate duplicate shuffle data and end up + // being ignored by the map output tracker. + // + // This means that an executor may be marked as having shuffle data, and thus prevented + // from being removed, even though the data may not be used. if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) From d841287acce53fabb0c00d46e9a846e47110494c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Jun 2019 15:31:49 -0700 Subject: [PATCH 6/8] More logging. --- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index cbc584cc4ade..f5beb403555e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -182,16 +182,20 @@ private[spark] class ExecutorMonitor( if (updateExecutors) { val activeShuffleIds = shuffleStages.map(_._2).toSeq var needTimeoutUpdate = false + val activatedExecs = new mutable.ArrayBuffer[String]() executors.asScala.foreach { case (id, exec) => if (!exec.hasActiveShuffle) { exec.updateActiveShuffles(activeShuffleIds) if (exec.hasActiveShuffle) { - logDebug(s"Executor $id has data needed by new active job.") needTimeoutUpdate = true + activatedExecs += id } } } + logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " + + s"needed by new job ${event.jobId}.") + if (needTimeoutUpdate) { nextTimeout.set(Long.MinValue) } @@ -229,11 +233,18 @@ private[spark] class ExecutorMonitor( } } + val deactivatedExecs = new mutable.ArrayBuffer[String]() executors.asScala.foreach { case (id, exec) => if (exec.hasActiveShuffle) { exec.updateActiveShuffles(activeShuffles) + if (!exec.hasActiveShuffle) { + deactivatedExecs += id + } } } + + logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " + + s"after job ${event.jobId} finished.") } jobToStageIDs.remove(event.jobId).foreach { stages => From e414f6099d6151cfedd75f4ac1dedf6b73be2d2d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Jun 2019 15:42:00 -0700 Subject: [PATCH 7/8] Another test. --- .../dynalloc/ExecutorMonitorSuite.scala | 61 ++++++++++++++++--- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index d108d261dc8e..e11ee97469b0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -263,13 +263,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("shuffle block tracking") { - // Mock the listener bus *only* for the functionality needed by the shuffle tracking code. - // Any other event sent through the mock bus will fail. - val bus = mock(classOf[LiveListenerBus]) - doAnswer { invocation => - monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent]) - }.when(bus).post(any()) - + val bus = mockListenerBus() conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) monitor = new ExecutorMonitor(conf, client, bus, clock) @@ -332,6 +326,47 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) } + test("shuffle tracking with multiple executors and concurrent jobs") { + val bus = mockListenerBus() + conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, bus, clock) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) + + // Two separate jobs with separate shuffles. The first job will only run tasks on + // executor 1, the second on executor 2. Ensures that jobs finishing don't affect + // executors that are active in other jobs. + + val stage1 = stageInfo(1, shuffleId = 0) + val stage2 = stageInfo(2) + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2))) + + val stage3 = stageInfo(3, shuffleId = 1) + val stage4 = stageInfo(4) + monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4))) + + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("2")) + + monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("2", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("2", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("1")) + assert(!monitor.isExecutorIdle("2")) + + monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("2")) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.shuffleCleaned(0) + monitor.shuffleCleaned(1) + assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2")) + } + private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1 @@ -365,4 +400,16 @@ class ExecutorMonitorSuite extends SparkFunSuite { RDDBlockId(rddId, splitIndex), level, 1L, 0L)) } + /** + * Mock the listener bus *only* for the functionality needed by the shuffle tracking code. + * Any other event sent through the mock bus will fail. + */ + private def mockListenerBus(): LiveListenerBus = { + val bus = mock(classOf[LiveListenerBus]) + doAnswer { invocation => + monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent]) + }.when(bus).post(any()) + bus + } + } From 6154bf486e68dbb5a4c16dc9e71030cc20d8ca58 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 15 Jul 2019 11:05:35 -0700 Subject: [PATCH 8/8] Docs. --- docs/configuration.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 06b040866f13..108862416f8d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2114,6 +2114,26 @@ Apart from these, the following properties are also available, and may be useful description. + + spark.dynamicAllocation.shuffleTracking.enabled + false + + Experimental. Enables shuffle file tracking for executors, which allows dynamic allocation + without the need for an external shuffle service. This option will try to keep alive executors + that are storing shuffle data for active jobs. + + + + spark.dynamicAllocation.shuffleTimeout + infinity + + When shuffle tracking is enabled, controls the timeout for executors that are holding shuffle + data. The default value means that Spark will rely on the shuffles being garbage collected to be + able to release executors. If for some reason garbage collection is not cleaning up shuffles + quickly enough, this option can be used to control when to time out executors even when they are + storing shuffle data. + + ### Thread Configurations