From 2cc0022a9f3aed037ec3b1f680ef7984abdfbc0b Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Mon, 11 Jan 2016 14:42:39 -0500 Subject: [PATCH 1/2] [SPARK-7831][Mesos] Added flag to shutdown driver when mesos dispatcher is stopped --- .../apache/spark/deploy/mesos/MesosClusterDispatcher.scala | 2 +- .../deploy/mesos/MesosClusterDispatcherArguments.scala | 6 ++++++ .../scheduler/cluster/mesos/MesosClusterScheduler.scala | 5 +++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 66e1e645007a..b630d333792d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -59,7 +59,7 @@ private[mesos] class MesosClusterDispatcher( case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) } - private val scheduler = new MesosClusterScheduler(engineFactory, conf) + private val scheduler = new MesosClusterScheduler(engineFactory, conf, args.driverFailOver) private val server = new MesosRestServer(args.host, args.port, conf, scheduler) private val webUi = new MesosClusterUI( diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala index 5accaf78d0a5..387ee6679d6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -29,6 +29,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: var masterUrl: String = _ var zookeeperUrl: Option[String] = None var propertiesFile: String = _ + var driverFailOver = true // keep the driver registered with Mesos after dispatcher is stopped parse(args.toList) @@ -70,6 +71,10 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: propertiesFile = value parse(tail) + case ("--disable-failover") :: tail => + driverFailOver = false + parse(tail) + case ("--help") :: tail => printUsageAndExit(0) @@ -97,6 +102,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + " --name NAME Framework name to show in Mesos UI\n" + " -m --master MASTER URI for connecting to Mesos master\n" + + " --disable-failover Will kill the driver when dispatcher is stopped\n" + " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + " Zookeeper for persistence\n" + " --properties-file FILE Path to a custom Spark properties file.\n" + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 05fda0fded7f..1b2308c275b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -114,7 +114,8 @@ private[spark] class MesosDriverState( */ private[spark] class MesosClusterScheduler( engineFactory: MesosClusterPersistenceEngineFactory, - conf: SparkConf) + conf: SparkConf, + driverFailOver: Boolean = true) extends Scheduler with MesosSchedulerUtils { var frameworkUrl: String = _ private val metricsSystem = @@ -318,7 +319,7 @@ private[spark] class MesosClusterScheduler( ready = false metricsSystem.report() metricsSystem.stop() - mesosDriver.stop(true) + mesosDriver.stop(driverFailOver) } override def registered( From 9002258fbede7d8da6934c8b4ea5512cac999db7 Mon Sep 17 00:00:00 2001 From: Nilanjan Raychaudhuri Date: Fri, 15 Jan 2016 12:01:30 -0500 Subject: [PATCH 2/2] Setting the driver failover timeout --- .../deploy/mesos/MesosClusterDispatcherArguments.scala | 2 +- .../scheduler/cluster/mesos/MesosClusterScheduler.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala index 387ee6679d6f..5d9c41dc331c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -102,7 +102,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + " --name NAME Framework name to show in Mesos UI\n" + " -m --master MASTER URI for connecting to Mesos master\n" + - " --disable-failover Will kill the driver when dispatcher is stopped\n" + + " --disable-failover De-register the framework when dispatcher is stopped\n" + " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + " Zookeeper for persistence\n" + " --properties-file FILE Path to a custom Spark properties file.\n" + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1b2308c275b7..1b863913f3ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -307,8 +307,8 @@ private[spark] class MesosClusterScheduler( appName, conf, Some(frameworkUrl), - Some(true), - Some(Integer.MAX_VALUE), + Some(driverFailOver), // with checkpoint data if failOver is true + Some(if (driverFailOver) Double.MaxValue else 0.0), // timeout, 0.0 means no recovery fwId) startScheduler(driver) @@ -319,7 +319,7 @@ private[spark] class MesosClusterScheduler( ready = false metricsSystem.report() metricsSystem.stop() - mesosDriver.stop(driverFailOver) + mesosDriver.stop(true) } override def registered(