Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}


import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -64,16 +65,31 @@ private[spark] class CoarseMesosSchedulerBackend(
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

private val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)

private val maxCpusPerExecutor =
conf.getOption("spark.mesos.coarse.executor.cores.max").map { m => m.toInt }

if (conf.getOption("spark.mesos.coarse.executors.max").isDefined && maxCpusPerExecutor.isEmpty) {
throw new IllegalArgumentException(
"Must configure spark.mesos.coarse.coresPerExecutor.max when " +
"spark.mesos.coarse.executors.max is set")
}

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
val coresByTaskId = new HashMap[String, Int]
var totalCoresAcquired = 0

val slaveIdsWithExecutors = new HashSet[String]

// Maping from slave Id to hostname
private val slaveIdToHost = new HashMap[String, String]

val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// Contains the list of slave ids that we have connect shuffle service to
private val existingSlaveShuffleConnections = new HashSet[String]

// Contains a mapping of slave ids to the number of executors launched.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of executors launched on that slave

val slaveIdsWithExecutors = new HashMap[String, Int]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we changed this variable to a HashMap with a different meaning, shall we change the name of this variable?

"slaveIdsToExecutorNumber"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or slaveIdsToNumExecutors


val taskIdToSlaveId: HashMap[String, String] = new HashMap[String, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]

Expand All @@ -89,8 +105,6 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)

private val pendingRemovedSlaveIds = new HashSet[String]

// private lock object protecting mutable state above. Using the intrinsic lock
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
Expand Down Expand Up @@ -122,10 +136,10 @@ private[spark] class CoarseMesosSchedulerBackend(

@volatile var appId: String = _

def newMesosTaskId(): Int = {
def newMesosTaskId(slaveId: String): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
id
slaveId + "/" + id
}

override def start() {
Expand All @@ -140,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(driver)
}

def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
Expand Down Expand Up @@ -184,20 +198,20 @@ private[spark] class CoarseMesosSchedulerBackend(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)

command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $executorId" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
Expand Down Expand Up @@ -244,58 +258,63 @@ private[spark] class CoarseMesosSchedulerBackend(
* unless we've already launched more than we wanted to.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
val memoryPerExecutor = calculateTotalMemory(sc)
stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers.asScala) {
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
var remainingMem = mem
var remainingCores = cpus
val tasks = new util.ArrayList[MesosTaskInfo]()
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.getValue
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (meetsConstraints) {
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}

// Accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0)
while (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
remainingMem >= calculateTotalMemory(sc) &&
remainingCores >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
executorCount < maxExecutorsPerSlave) {
val coresToUse =
math.min(maxCpusPerExecutor.getOrElse(Int.MaxValue),
math.min(remainingCores, maxCores - totalCoresAcquired))
totalCoresAcquired += coresToUse
remainingCores -= coresToUse
remainingMem -= memoryPerExecutor
val taskId = newMesosTaskId(slaveId)
taskIdToSlaveId(taskId) = slaveId
executorCount += 1
slaveIdsWithExecutors(slaveId) = executorCount
coresByTaskId(taskId) = coresToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", coresToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, coresToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
tasks.add(taskBuilder.build())
}

if (!tasks.isEmpty) {
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(Collections.singleton(offer.getId()), tasks, filters)
} else {
// This offer does not meet constraints. We don't need to see it again.
// Decline the offer for a long period of time.
Expand All @@ -310,7 +329,7 @@ private[spark] class CoarseMesosSchedulerBackend(


override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val taskId = status.getTaskId.getValue
val state = status.getState
logInfo(s"Mesos task $taskId is now $state")
val slaveId: String = status.getSlaveId.getValue
Expand All @@ -321,7 +340,8 @@ private[spark] class CoarseMesosSchedulerBackend(
// this through Mesos, since the shuffle services are set up independently.
if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
slaveIdToHost.contains(slaveId) &&
shuffleServiceEnabled) {
shuffleServiceEnabled &&
!existingSlaveShuffleConnections.contains(slaveId)) {
assume(mesosExternalShuffleClient.isDefined,
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
Expand All @@ -332,12 +352,10 @@ private[spark] class CoarseMesosSchedulerBackend(
s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
mesosExternalShuffleClient.get
.registerDriverWithShuffleService(hostname, externalShufflePort)
}
existingSlaveShuffleConnections += slaveId
} else if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)

if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId.get(taskId)
slaveIdsWithExecutors -= slaveId
taskIdToSlaveId.remove(taskId)
// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
Expand All @@ -351,7 +369,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
executorTerminated(d, slaveId, s"Executor finished with state $state")
executorTerminated(d, taskId, slaveId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
d.reviveOffers()
}
Expand All @@ -378,35 +396,39 @@ private[spark] class CoarseMesosSchedulerBackend(
* slave IDs that we might have asked to be killed. It also notifies the driver
* that an executor was removed.
*/
private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
private def executorTerminated(
d: SchedulerDriver,
executorId: String,
slaveId: String,
reason: String): Unit = {
stateLock.synchronized {
if (slaveIdsWithExecutors.contains(slaveId)) {
val slaveIdToTaskId = taskIdToSlaveId.inverse()
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
if (slaveIdsWithExecutors.contains(slaveId) && taskIdToSlaveId.contains(executorId)) {
taskIdToSlaveId.remove(executorId)
removeExecutor(executorId, new ExecutorLossReason(reason))
val newCount = slaveIdsWithExecutors(slaveId) - 1
if (newCount == 0) {
slaveIdsWithExecutors.remove(slaveId)
} else {
slaveIdsWithExecutors(slaveId) = newCount
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
pendingRemovedSlaveIds -= slaveId
slaveIdsWithExecutors -= slaveId
}
}
}

private def sparkExecutorId(slaveId: String, taskId: String): String = {
s"$slaveId/$taskId"
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
// Terminate all executors in the slave
stateLock.synchronized {
val lostExecutors = taskIdToSlaveId.filter(_._2.equals(slaveId.getValue)).map(_._1)
lostExecutors.foreach { taskId =>
executorTerminated(d, taskId, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
}
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't executorLost also get an update? Right now it calls slaveLost, which removes all executors on the given slave, but if a slave now runs multiple executors, this seems wrong now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I've updated it now.

logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
logInfo("Executor lost: %s".format(e.getValue))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking: you could use string interpolation: s"Executor lost: ${e.getValue}"

executorTerminated(d, e.getValue, s.getValue, "Mesos Executor lost: " + e.getValue)
}

override def applicationId(): String =
Expand All @@ -429,13 +451,9 @@ private[spark] class CoarseMesosSchedulerBackend(
return false
}

val slaveIdToTaskId = taskIdToSlaveId.inverse()
for (executorId <- executorIds) {
val slaveId = executorId.split("/")(0)
if (slaveIdToTaskId.containsKey(slaveId)) {
mesosDriver.killTask(
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
pendingRemovedSlaveIds += slaveId
if (taskIdToSlaveId.contains(executorId)) {
mesosDriver.killTask(TaskID.newBuilder().setValue(executorId).build())
} else {
logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
}
Expand Down
Loading