diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8d5ad12cb85be..bd4963a08bfda 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config
By default Mesos agents will not pull images they already have cached.
+
+ spark.mesos.executor.docker.parameters |
+ (none) |
+
+ Set the list of custom parameters which will be passed into the docker run command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of
+ key/value pairs. Example:
+
+ key1=val1,key2=val2,key3=val3
+ |
+
spark.mesos.executor.docker.volumes |
(none) |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index a2adb228dc299..fbcbc55099ec5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.mesos
-import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume}
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
import org.apache.spark.{SparkConf, SparkException}
@@ -99,6 +99,28 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.toList
}
+ /**
+ * Parse a list of docker parameters, each of which
+ * takes the form key=value
+ */
+ private def parseParamsSpec(params: String): List[Parameter] = {
+ // split with limit of 2 to avoid parsing error when '='
+ // exists in the parameter value
+ params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] =>
+ val param: Parameter.Builder = Parameter.newBuilder()
+ spec match {
+ case Array(key, value) =>
+ Some(param.setKey(key).setValue(value))
+ case spec =>
+ logWarning(s"Unable to parse arbitary parameters: $params. "
+ + "Expected form: \"key=value(, ...)\"")
+ None
+ }
+ }
+ .map { _.build() }
+ .toList
+ }
+
def containerInfo(conf: SparkConf): ContainerInfo = {
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
conf.get("spark.mesos.containerizer", "docker") == "docker") {
@@ -120,8 +142,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.map(parsePortMappingsSpec)
.getOrElse(List.empty)
+ val params = conf
+ .getOption("spark.mesos.executor.docker.parameters")
+ .map(parseParamsSpec)
+ .getOrElse(List.empty)
+
if (containerType == ContainerInfo.Type.DOCKER) {
- containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
+ containerInfo
+ .setDocker(dockerInfo(image, forcePullImage, portMaps, params))
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
@@ -144,11 +172,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
private def dockerInfo(
image: String,
forcePullImage: Boolean,
- portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
+ portMaps: List[ContainerInfo.DockerInfo.PortMapping],
+ params: List[Parameter]): DockerInfo = {
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
portMaps.foreach(dockerBuilder.addPortMappings(_))
+ params.foreach(dockerBuilder.addParameters(_))
dockerBuilder.build
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
new file mode 100644
index 0000000000000..caf9d89fdd201
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
+
+ test("ContainerInfo fails to parse invalid docker parameters") {
+ val conf = new SparkConf()
+ conf.set("spark.mesos.executor.docker.parameters", "a,b")
+ conf.set("spark.mesos.executor.docker.image", "test")
+
+ val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+ val params = containerInfo.getDocker.getParametersList
+
+ assert(params.size() == 0)
+ }
+
+ test("ContainerInfo parses docker parameters") {
+ val conf = new SparkConf()
+ conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
+ conf.set("spark.mesos.executor.docker.image", "test")
+
+ val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
+ val params = containerInfo.getDocker.getParametersList
+ assert(params.size() == 3)
+ assert(params.get(0).getKey == "a")
+ assert(params.get(0).getValue == "1")
+ assert(params.get(1).getKey == "b")
+ assert(params.get(1).getValue == "2")
+ assert(params.get(2).getKey == "c")
+ assert(params.get(2).getValue == "3")
+ }
+}