From 481abaf68aecdf3d6a79aef80895edb88bc65ad7 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 27 Jun 2019 13:06:53 -0500 Subject: [PATCH 1/6] [SPARK-27959] Change YARN resource configs to use .amount --- docs/running-on-yarn.md | 14 ++-- .../org/apache/spark/deploy/yarn/Client.scala | 14 ++-- .../deploy/yarn/ResourceRequestHelper.scala | 43 +++++++++++- .../spark/deploy/yarn/YarnAllocator.scala | 5 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 18 ----- .../spark/deploy/yarn/ClientSuite.scala | 2 +- .../yarn/ResourceRequestHelperSuite.scala | 66 ++++++++++++++++--- .../deploy/yarn/YarnAllocatorSuite.scala | 7 +- 8 files changed, 120 insertions(+), 49 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index dc93e9cea5bce..9d9b253a5c8ea 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -142,20 +142,20 @@ To use a custom metrics.properties for the application master and executors, upd - spark.yarn.am.resource.{resource-type} + spark.yarn.am.resource.{resource-type}.amount (none) Amount of resource to use for the YARN Application Master in client mode. - In cluster mode, use spark.yarn.driver.resource.<resource-type> instead. + In cluster mode, use spark.yarn.driver.resource.<resource-type>.amount instead. Please note that this feature can be used only with YARN 3.0+ For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

Example: - To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu + To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu.amount - spark.yarn.driver.resource.{resource-type} + spark.yarn.driver.resource.{resource-type}.amount (none) Amount of resource to use for the YARN Application Master in cluster mode. @@ -163,11 +163,11 @@ To use a custom metrics.properties for the application master and executors, upd For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

Example: - To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu + To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu.amount - spark.yarn.executor.resource.{resource-type} + spark.yarn.executor.resource.{resource-type}.amount (none) Amount of resource to use per executor process. @@ -175,7 +175,7 @@ To use a custom metrics.properties for the application master and executors, upd For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html

Example: - To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu + To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu.amount diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5b361d17c01a9..651e706021fcb 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -51,7 +51,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.api.python.PythonUtils import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -241,12 +241,12 @@ private[spark] class Client( newApp: YarnClientApplication, containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { - val yarnAMResources = - if (isClusterMode) { - sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap - } else { - sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap - } + val componentName = if (isClusterMode) { + config.YARN_DRIVER_RESOURCE_TYPES_PREFIX + } else { + config.YARN_AM_RESOURCE_TYPES_PREFIX + } + val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName) val amResources = yarnAMResources ++ getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf) logDebug(s"AM resources: $amResources") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index cb0c68d1d346d..31fdcb448d025 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -26,11 +26,11 @@ import scala.util.Try import org.apache.hadoop.yarn.api.records.Resource import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU} import org.apache.spark.util.{CausedBy, Utils} /** @@ -40,6 +40,45 @@ import org.apache.spark.util.{CausedBy, Utils} private object ResourceRequestHelper extends Logging { private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation" + val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu" + val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga" + + private[yarn] def getYarnResourcesAndAmounts( + sparkConf: SparkConf, + componentName: String): Map[String, String] = { + sparkConf.getAllWithPrefix(s"$componentName").map { case (key, value) => + val splitIndex = key.lastIndexOf('.') + if (splitIndex == -1) { + val errorMessage = s"Missing suffix for ${componentName}${key}, you must specify" + + s" a suffix - $AMOUNT is currently the only supported suffix." + throw new IllegalArgumentException(errorMessage.toString()) + } + val resourceName = key.substring(0, splitIndex) + val resourceSuffix = key.substring(splitIndex + 1) + if (!AMOUNT.equals(resourceSuffix)) { + val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}," + + s"only .$AMOUNT is supported." + throw new IllegalArgumentException(errorMessage.toString()) + } + (resourceName, value) + }.toMap + } + + /** + * Convert Spark resources into YARN resources. + * The only resources we know how to map from spark configs to yarn configs are + * gpus and fpgas, everything else the user has to specify them in both the + * spark.yarn.*.resource and the spark.*.resource configs. + */ + private[yarn] def getYarnResourcesFromSparkResources( + confPrefix: String, + sparkConf: SparkConf + ): Map[String, String] = { + Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map { + case (rName, yarnName) => + (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0")) + }.filter { case (_, count) => count.toLong > 0 } + } /** * Validates sparkConf and throws a SparkException if any of standard resources (memory or cores) @@ -81,7 +120,7 @@ private object ResourceRequestHelper extends Logging { val errorMessage = new mutable.StringBuilder() resourceDefinitions.foreach { case (sparkName, resourceRequest) => - if (sparkConf.contains(resourceRequest)) { + if (sparkConf.contains(s"${resourceRequest}.${AMOUNT}")) { errorMessage.append(s"Error: Do not use $resourceRequest, " + s"please use $sparkName instead!\n") } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 6e634b921fcd1..8ec7bd66b2507 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging @@ -142,8 +143,8 @@ private[yarn] class YarnAllocator( protected val executorCores = sparkConf.get(EXECUTOR_CORES) private val executorResourceRequests = - sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++ - getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf) + getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++ + getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf) // Resource capability requested for each executor private[yarn] val resource: Resource = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 6b87eec795f9f..11035520ae185 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -41,24 +41,6 @@ object YarnSparkHadoopUtil { val MEMORY_OVERHEAD_MIN = 384L val ANY_HOST = "*" - val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu" - val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga" - - /** - * Convert Spark resources into YARN resources. - * The only resources we know how to map from spark configs to yarn configs are - * gpus and fpgas, everything else the user has to specify them in both the - * spark.yarn.*.resource and the spark.*.resource configs. - */ - private[yarn] def getYarnResourcesFromSparkResources( - confPrefix: String, - sparkConf: SparkConf - ): Map[String, String] = { - Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map { - case (rName, yarnName) => - (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0")) - }.filter { case (_, count) => count.toLong > 0 } - } // All RM requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index d5f1992a09f53..d4e9a775322ba 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,7 +38,7 @@ import org.mockito.Mockito.{spy, verify} import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index 9e3cc6ec01dfd..c2e9cf60b6c1f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -22,9 +22,11 @@ import org.apache.hadoop.yarn.util.Records import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY} +import org.apache.spark.resource.ResourceUtils.AMOUNT class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { @@ -41,7 +43,7 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { test("empty SparkConf should be valid") { val sparkConf = new SparkConf() - ResourceRequestHelper.validateResources(sparkConf) + validateResources(sparkConf) } test("just normal resources are defined") { @@ -50,7 +52,53 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { sparkConf.set(DRIVER_CORES.key, "4") sparkConf.set(EXECUTOR_MEMORY.key, "4G") sparkConf.set(EXECUTOR_CORES.key, "2") - ResourceRequestHelper.validateResources(sparkConf) + validateResources(sparkConf) + } + + test("get yarn resources from configs") { + val sparkConf = new SparkConf() + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "2G", + YARN_FPGA_RESOURCE_CONFIG -> "3G", "custom" -> "4") + resources.foreach { case (name, value) => + sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value) + sparkConf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value) + sparkConf.set(s"${YARN_AM_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value) + } + var parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) + assert(resources.size === 3) + resources.foreach { case (key, value) => + assert(parsedResources(key) === value) + } + parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_DRIVER_RESOURCE_TYPES_PREFIX) + assert(resources.size === 3) + resources.foreach { case (key, value) => + assert(parsedResources(key) === value) + } + parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_AM_RESOURCE_TYPES_PREFIX) + assert(resources.size === 3) + resources.foreach { case (key, value) => + assert(parsedResources(key) === value) + } + } + + test("get invalid yarn resources from configs") { + val sparkConf = new SparkConf() + + val missingAmountConfig = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}missingAmount" + // missing .amount + sparkConf.set(missingAmountConfig, "2g") + var thrown = intercept[IllegalArgumentException] { + getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) + } + thrown.getMessage should include("Missing suffix for") + + sparkConf.remove(missingAmountConfig) + sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}customResource.invalid", "2g") + + thrown = intercept[IllegalArgumentException] { + getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) + } + thrown.getMessage should include("Unsupported suffix") } Seq( @@ -60,14 +108,14 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { ResourceInformation(CUSTOM_RES_2, 10, "G")) ).foreach { case (name, resources) => test(s"valid request: $name") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + assume(isYarnResourceTypesAvailable()) val resourceDefs = resources.map { r => r.name } val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap ResourceRequestTestHelper.initializeResourceTypes(resourceDefs) val resource = createResource() - ResourceRequestHelper.setResourceRequests(requests, resource) + setResourceRequests(requests, resource) resources.foreach { r => val requested = ResourceRequestTestHelper.getResourceInformationByName(resource, r.name) @@ -82,12 +130,12 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { ("invalid unit", CUSTOM_RES_1, "123ppp") ).foreach { case (name, key, value) => test(s"invalid request: $name") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + assume(isYarnResourceTypesAvailable()) ResourceRequestTestHelper.initializeResourceTypes(Seq(key)) val resource = createResource() val thrown = intercept[IllegalArgumentException] { - ResourceRequestHelper.setResourceRequests(Map(key -> value), resource) + setResourceRequests(Map(key -> value), resource) } thrown.getMessage should include (key) } @@ -105,10 +153,10 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { NEW_CONFIG_DRIVER_CORES -> "1G" ).foreach { case (key, value) => test(s"disallowed resource request: $key") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + assume(isYarnResourceTypesAvailable()) val conf = new SparkConf(false).set(key, value) val thrown = intercept[SparkException] { - ResourceRequestHelper.validateResources(conf) + validateResources(conf) } thrown.getMessage should include (key) } @@ -126,7 +174,7 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "2G") val thrown = intercept[SparkException] { - ResourceRequestHelper.validateResources(sparkConf) + validateResources(sparkConf) } thrown.getMessage should ( include(NEW_CONFIG_EXECUTOR_MEMORY) and diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index ca89af26230f9..09dc361f5444f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ @@ -160,7 +161,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } test("custom resource requested from yarn") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + assume(isYarnResourceTypesAvailable()) ResourceRequestTestHelper.initializeResourceTypes(List("gpu")) val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) @@ -174,7 +175,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter // get amount of memory and vcores from resource, so effectively skipping their validation val expectedResources = Resource.newInstance(handler.resource.getMemory(), handler.resource.getVirtualCores) - ResourceRequestHelper.setResourceRequests(Map("gpu" -> "2G"), expectedResources) + setResourceRequests(Map("gpu" -> "2G"), expectedResources) val captor = ArgumentCaptor.forClass(classOf[ContainerRequest]) verify(mockAmClient).addContainerRequest(captor.capture()) @@ -183,7 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } test("custom spark resource mapped to yarn resource configs") { - assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + assume(isYarnResourceTypesAvailable()) val yarnMadeupResource = "yarn.io/madeup" val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource) ResourceRequestTestHelper.initializeResourceTypes(yarnResources) From bef554b6e4d230f229bb23117a223e5b57493d72 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 27 Jun 2019 13:18:44 -0500 Subject: [PATCH 2/6] Fix unit test --- .../apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index c2e9cf60b6c1f..1263d785d4ed1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -32,8 +32,8 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { private val CUSTOM_RES_1 = "custom-resource-type-1" private val CUSTOM_RES_2 = "custom-resource-type-2" - private val MEMORY = "memory" - private val CORES = "cores" + private val MEMORY = "memory.amount" + private val CORES = "cores.amount" private val NEW_CONFIG_EXECUTOR_MEMORY = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY private val NEW_CONFIG_EXECUTOR_CORES = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY From 428f63a6952bd2a65109a25b68c3627e3e5dcb49 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 27 Jun 2019 13:43:04 -0500 Subject: [PATCH 3/6] fix conflicting resources --- .../org/apache/spark/deploy/yarn/ResourceRequestHelper.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 31fdcb448d025..51766c3671c68 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -120,8 +120,9 @@ private object ResourceRequestHelper extends Logging { val errorMessage = new mutable.StringBuilder() resourceDefinitions.foreach { case (sparkName, resourceRequest) => - if (sparkConf.contains(s"${resourceRequest}.${AMOUNT}")) { - errorMessage.append(s"Error: Do not use $resourceRequest, " + + val resourceRequestAmount = s"${resourceRequest}.${AMOUNT}" + if (sparkConf.contains(resourceRequestAmount)) { + errorMessage.append(s"Error: Do not use $resourceRequestAmount, " + s"please use $sparkName instead!\n") } } From 0d87d0883bbe531afcbd11eee44dda33ae59bc9b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 28 Jun 2019 13:50:52 -0500 Subject: [PATCH 4/6] Fix tests for change to .amount --- .../spark/deploy/yarn/ClientSuite.scala | 17 ++++++------- .../yarn/ResourceRequestHelperSuite.scala | 24 ++++++++++--------- .../deploy/yarn/YarnAllocatorSuite.scala | 6 +++-- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index d4e9a775322ba..77bd988b42d8b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,6 +38,7 @@ import org.mockito.Mockito.{spy, verify} import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.resource.ResourceUtils.AMOUNT import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ @@ -372,7 +373,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode) resources.foreach { case (name, v) => - conf.set(prefix + name, v.toString) + conf.set(s"${prefix}${name}.${AMOUNT}", v.toString) } val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) @@ -397,7 +398,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") resources.keys.foreach { yarnName => - conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2") + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") } resources.values.foreach { rName => conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") @@ -407,9 +408,9 @@ class ClientSuite extends SparkFunSuite with Matchers { ResourceRequestHelper.validateResources(conf) }.getMessage() - assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," + + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," + " please use spark.driver.resource.fpga.amount")) - assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," + + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," + " please use spark.driver.resource.gpu.amount")) } @@ -420,7 +421,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") resources.keys.foreach { yarnName => - conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2") + conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") } resources.values.foreach { rName => conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3") @@ -430,9 +431,9 @@ class ClientSuite extends SparkFunSuite with Matchers { ResourceRequestHelper.validateResources(conf) }.getMessage() - assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," + + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," + " please use spark.executor.resource.fpga.amount")) - assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," + + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," + " please use spark.executor.resource.gpu.amount")) } @@ -450,7 +451,7 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") } // also just set yarn one that we don't convert - conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5") + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5") val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index 1263d785d4ed1..efea747c11489 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -32,14 +32,16 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { private val CUSTOM_RES_1 = "custom-resource-type-1" private val CUSTOM_RES_2 = "custom-resource-type-2" - private val MEMORY = "memory.amount" - private val CORES = "cores.amount" - private val NEW_CONFIG_EXECUTOR_MEMORY = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY - private val NEW_CONFIG_EXECUTOR_CORES = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES - private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY - private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES - private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY - private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES + private val MEMORY = "memory" + private val CORES = "cores" + private val NEW_CONFIG_EXECUTOR_MEMORY = + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}" + private val NEW_CONFIG_EXECUTOR_CORES = + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}" + private val NEW_CONFIG_AM_MEMORY = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}" + private val NEW_CONFIG_AM_CORES = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}" + private val NEW_CONFIG_DRIVER_MEMORY = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}" + private val NEW_CONFIG_DRIVER_CORES = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}" test("empty SparkConf should be valid") { val sparkConf = new SparkConf() @@ -143,10 +145,10 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { Seq( NEW_CONFIG_EXECUTOR_MEMORY -> "30G", - YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb" -> "30G", - YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb" -> "30G", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}memory-mb.$AMOUNT" -> "30G", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}mb.$AMOUNT" -> "30G", NEW_CONFIG_EXECUTOR_CORES -> "5", - YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores" -> "5", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}vcores.$AMOUNT" -> "5", NEW_CONFIG_AM_MEMORY -> "1G", NEW_CONFIG_DRIVER_MEMORY -> "1G", NEW_CONFIG_AM_CORES -> "3", diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 09dc361f5444f..4ac27ede64831 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU} import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo @@ -166,7 +167,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) val handler = createAllocator(1, mockAmClient, - Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G")) + Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G")) handler.updateResourceRequests() val container = createContainer("host1", resource = handler.resource) @@ -189,10 +190,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource) ResourceRequestTestHelper.initializeResourceTypes(yarnResources) val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + val madeupConfigName = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}" val sparkResources = Map(EXECUTOR_GPU_ID.amountConf -> "3", EXECUTOR_FPGA_ID.amountConf -> "2", - s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5") + madeupConfigName -> "5") val handler = createAllocator(1, mockAmClient, sparkResources) handler.updateResourceRequests() From 6af99885c894b4fe4f360504aaf2895b74f31760 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 28 Jun 2019 14:07:02 -0500 Subject: [PATCH 5/6] fix import order: --- .../test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 77bd988b42d8b..847fc3773de59 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,11 +38,11 @@ import org.mockito.Mockito.{spy, verify} import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} -import org.apache.spark.resource.ResourceUtils.AMOUNT import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceID +import org.apache.spark.resource.ResourceUtils.AMOUNT import org.apache.spark.util.{SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers { From 6e86ab9b9db12a88998e54d6358d3ac6143e957a Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 16 Jul 2019 08:22:51 -0500 Subject: [PATCH 6/6] rework --- .../spark/deploy/yarn/ResourceRequestHelper.scala | 2 +- .../deploy/yarn/ResourceRequestHelperSuite.scala | 15 +++------------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 51766c3671c68..522c16b3a1082 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -56,7 +56,7 @@ private object ResourceRequestHelper extends Logging { val resourceName = key.substring(0, splitIndex) val resourceSuffix = key.substring(splitIndex + 1) if (!AMOUNT.equals(resourceSuffix)) { - val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}," + + val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}, " + s"only .$AMOUNT is supported." throw new IllegalArgumentException(errorMessage.toString()) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index efea747c11489..f5ec531e26e0c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -67,20 +67,11 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { sparkConf.set(s"${YARN_AM_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value) } var parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) - assert(resources.size === 3) - resources.foreach { case (key, value) => - assert(parsedResources(key) === value) - } + assert(parsedResources === resources) parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_DRIVER_RESOURCE_TYPES_PREFIX) - assert(resources.size === 3) - resources.foreach { case (key, value) => - assert(parsedResources(key) === value) - } + assert(parsedResources === resources) parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_AM_RESOURCE_TYPES_PREFIX) - assert(resources.size === 3) - resources.foreach { case (key, value) => - assert(parsedResources(key) === value) - } + assert(parsedResources === resources) } test("get invalid yarn resources from configs") {