diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java index 60545bee2e..7f5205d461 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java @@ -24,11 +24,13 @@ class AMOptions { public static final String NAMENODE_MEMORY_MB_DEFAULT = "2048"; public static final String NAMENODE_VCORES_ARG = "namenode_vcores"; public static final String NAMENODE_VCORES_DEFAULT = "1"; + public static final String NAMENODE_NODELABEL_ARG = "namenode_nodelabel"; public static final String NAMENODE_ARGS_ARG = "namenode_args"; public static final String DATANODE_MEMORY_MB_ARG = "datanode_memory_mb"; public static final String DATANODE_MEMORY_MB_DEFAULT = "2048"; public static final String DATANODE_VCORES_ARG = "datanode_vcores"; public static final String DATANODE_VCORES_DEFAULT = "1"; + public static final String DATANODE_NODELABEL_ARG = "datanode_nodelabel"; public static final String DATANODE_ARGS_ARG = "datanode_args"; public static final String NAMENODE_METRICS_PERIOD_ARG = "namenode_metrics_period"; public static final String NAMENODE_METRICS_PERIOD_DEFAULT = "60"; @@ -41,11 +43,13 @@ class AMOptions { private final int datanodeMemoryMB; private final int datanodeVirtualCores; private final String datanodeArgs; + private final String datanodeNodeLabelExpression; private final int datanodesPerCluster; private final String datanodeLaunchDelay; private final int namenodeMemoryMB; private final int namenodeVirtualCores; private final String namenodeArgs; + private final String namenodeNodeLabelExpression; private final int namenodeMetricsPeriod; // Original shellEnv as passed in through arguments private final Map originalShellEnv; @@ -53,16 +57,19 @@ class AMOptions { private final Map shellEnv; AMOptions(int datanodeMemoryMB, int datanodeVirtualCores, String datanodeArgs, - int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB, int namenodeVirtualCores, - String namenodeArgs, int namenodeMetricsPeriod, Map shellEnv) { + String datanodeNodeLabelExpression, int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB, + int namenodeVirtualCores, String namenodeArgs, String namenodeNodeLabelExpression, int namenodeMetricsPeriod, + Map shellEnv) { this.datanodeMemoryMB = datanodeMemoryMB; this.datanodeVirtualCores = datanodeVirtualCores; this.datanodeArgs = datanodeArgs; + this.datanodeNodeLabelExpression = datanodeNodeLabelExpression; this.datanodesPerCluster = datanodesPerCluster; this.datanodeLaunchDelay = datanodeLaunchDelay; this.namenodeMemoryMB = namenodeMemoryMB; this.namenodeVirtualCores = namenodeVirtualCores; this.namenodeArgs = namenodeArgs; + this.namenodeNodeLabelExpression = namenodeNodeLabelExpression; this.namenodeMetricsPeriod = namenodeMetricsPeriod; this.originalShellEnv = shellEnv; this.shellEnv = new HashMap<>(this.originalShellEnv); @@ -100,6 +107,9 @@ void addToVargs(List vargs) { if (!datanodeArgs.isEmpty()) { vargs.add("--" + DATANODE_ARGS_ARG + " \\\"" + datanodeArgs + "\\\""); } + if (!datanodeNodeLabelExpression.isEmpty()) { + vargs.add("--" + DATANODE_NODELABEL_ARG + " \\\"" + datanodeNodeLabelExpression + "\\\""); + } vargs.add("--" + DATANODES_PER_CLUSTER_ARG + " " + String.valueOf(datanodesPerCluster)); vargs.add("--" + DATANODE_LAUNCH_DELAY_ARG + " " + datanodeLaunchDelay); vargs.add("--" + NAMENODE_MEMORY_MB_ARG + " " + String.valueOf(namenodeMemoryMB)); @@ -107,6 +117,9 @@ void addToVargs(List vargs) { if (!namenodeArgs.isEmpty()) { vargs.add("--" + NAMENODE_ARGS_ARG + " \\\"" + namenodeArgs + "\\\""); } + if (!namenodeNodeLabelExpression.isEmpty()) { + vargs.add("--" + NAMENODE_NODELABEL_ARG + " \\\"" + namenodeNodeLabelExpression + "\\\""); + } vargs.add("--" + NAMENODE_METRICS_PERIOD_ARG + " " + String.valueOf(namenodeMetricsPeriod)); for (Map.Entry entry : originalShellEnv.entrySet()) { vargs.add("--" + SHELL_ENV_ARG + " " + entry.getKey() + "=" + entry.getValue()); @@ -121,6 +134,10 @@ int getDataNodeVirtualCores() { return datanodeVirtualCores; } + String getDataNodeNodeLabelExpression() { + return datanodeNodeLabelExpression; + } + int getDataNodesPerCluster() { return datanodesPerCluster; } @@ -141,6 +158,10 @@ int getNameNodeVirtualCores() { return namenodeVirtualCores; } + String getNameNodeNodeLabelExpression() { + return namenodeNodeLabelExpression; + } + Map getShellEnv() { return shellEnv; } @@ -160,6 +181,8 @@ static void setOptions(Options opts) { "Ignored unless the NameNode is run within YARN."); opts.addOption(NAMENODE_ARGS_ARG, true, "Additional arguments to add when starting the NameNode. Ignored unless the NameNode is run within YARN."); + opts.addOption(NAMENODE_NODELABEL_ARG, true, + "The node label to specify for the container to use to run the NameNode."); opts.addOption(NAMENODE_METRICS_PERIOD_ARG, true, "The period in seconds for the NameNode's metrics to be emitted to file; if <=0, " + "disables this functionality. Otherwise, a metrics file will be stored in the " + @@ -169,6 +192,8 @@ static void setOptions(Options opts) { opts.addOption(DATANODE_VCORES_ARG, true, "Amount of virtual cores to be requested to run the DNs (default " + DATANODE_VCORES_DEFAULT + ")"); opts.addOption(DATANODE_ARGS_ARG, true, "Additional arguments to add when starting the DataNodes."); + opts.addOption(DATANODE_NODELABEL_ARG, true, + "The node label to specify for the container to use to run the DataNode."); opts.addOption(DATANODES_PER_CLUSTER_ARG, true, "How many simulated DataNodes to run within each YARN container " + "(default " + DATANODES_PER_CLUSTER_DEFAULT + ")"); opts.addOption(DATANODE_LAUNCH_DELAY_ARG, true, "The period over which to launch the DataNodes; this will " + @@ -207,11 +232,13 @@ static AMOptions initFromParser(CommandLine cliParser) { Integer.parseInt(cliParser.getOptionValue(DATANODE_MEMORY_MB_ARG, DATANODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(DATANODE_VCORES_ARG, DATANODE_VCORES_DEFAULT)), cliParser.getOptionValue(DATANODE_ARGS_ARG, ""), + cliParser.getOptionValue(DATANODE_NODELABEL_ARG, ""), Integer.parseInt(cliParser.getOptionValue(DATANODES_PER_CLUSTER_ARG, DATANODES_PER_CLUSTER_DEFAULT)), cliParser.getOptionValue(DATANODE_LAUNCH_DELAY_ARG, DATANODE_LAUNCH_DELAY_DEFAULT), Integer.parseInt(cliParser.getOptionValue(NAMENODE_MEMORY_MB_ARG, NAMENODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(NAMENODE_VCORES_ARG, NAMENODE_VCORES_DEFAULT)), cliParser.getOptionValue(NAMENODE_ARGS_ARG, ""), + cliParser.getOptionValue(NAMENODE_NODELABEL_ARG, ""), Integer.parseInt(cliParser.getOptionValue(NAMENODE_METRICS_PERIOD_ARG, NAMENODE_METRICS_PERIOD_DEFAULT)), originalShellEnv); } diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java index d7b86c8413..eff89ce1bc 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java @@ -295,8 +295,8 @@ public Boolean get() { Optional namenodeProperties = Optional.absent(); if (launchNameNode) { - ContainerRequest nnContainerRequest = setupContainerAskForRM( - amOptions.getNameNodeMemoryMB(), amOptions.getNameNodeVirtualCores(), 0); + ContainerRequest nnContainerRequest = setupContainerAskForRM(amOptions.getNameNodeMemoryMB(), + amOptions.getNameNodeVirtualCores(), 0, amOptions.getNameNodeNodeLabelExpression()); LOG.info("Requested NameNode ask: " + nnContainerRequest.toString()); amRMClient.addContainerRequest(nnContainerRequest); @@ -333,7 +333,7 @@ public Boolean get() { amOptions.getDataNodeVirtualCores() + " vcores, "); for (int i = 0; i < numTotalDataNodeContainers; ++i) { ContainerRequest datanodeAsk = setupContainerAskForRM(amOptions.getDataNodeMemoryMB(), - amOptions.getDataNodeVirtualCores(), 1); + amOptions.getDataNodeVirtualCores(), 1, amOptions.getDataNodeNodeLabelExpression()); amRMClient.addContainerRequest(datanodeAsk); LOG.debug("Requested datanode ask: " + datanodeAsk.toString()); } @@ -789,7 +789,7 @@ private boolean isDataNode(ContainerId containerId) { * * @return the setup ResourceRequest to be sent to RM */ - private ContainerRequest setupContainerAskForRM(int memory, int vcores, int priority) { + private ContainerRequest setupContainerAskForRM(int memory, int vcores, int priority, String nodeLabel) { Priority pri = Records.newRecord(Priority.class); pri.setPriority(priority); @@ -799,7 +799,7 @@ private ContainerRequest setupContainerAskForRM(int memory, int vcores, int prio capability.setMemory(memory); capability.setVirtualCores(vcores); - return new ContainerRequest(capability, null, null, pri); + return new ContainerRequest(capability, null, null, pri, true, nodeLabel); } } diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index 22da18dedc..e77e3760e5 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -6,6 +6,7 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; +import com.google.common.collect.Sets; import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import java.io.File; @@ -14,8 +15,11 @@ import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -38,10 +42,18 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -81,7 +93,7 @@ public class TestDynamometerInfra { private static final Log LOG = LogFactory.getLog(TestDynamometerInfra.class); - private static final int MINICLUSTER_NUM_NMS = 1; + private static final int MINICLUSTER_NUM_NMS = 3; private static final int MINICLUSTER_NUM_DNS = 1; private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path"; @@ -90,6 +102,9 @@ public class TestDynamometerInfra { private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740"; private static final String VERSION_FILENAME = "VERSION"; + private static final String NAMENODE_NODELABEL = "dyno_namenode"; + private static final String DATANODE_NODELABEL = "dyno_datanode"; + private static MiniDFSCluster miniDFSCluster; private static MiniYARNCluster miniYARNCluster; private static YarnClient yarnClient; @@ -144,12 +159,23 @@ public static void setupClass() throws Exception { fail("Unable to execute tar to expand Hadoop binary"); } - conf.setBoolean("yarn.minicluster.fixed.ports", true); - conf.setBoolean("yarn.minicluster.use-rpc", true); - conf.setInt("yarn.scheduler.minimum-allocation-mb", 128); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + for (String q : new String[] { "root", "root.default" } ) { + conf.setInt(CapacitySchedulerConfiguration.PREFIX + q + "." + CapacitySchedulerConfiguration.CAPACITY, 100); + String accessibleNodeLabelPrefix = CapacitySchedulerConfiguration.PREFIX + q + "." + + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS; + conf.set(accessibleNodeLabelPrefix, CapacitySchedulerConfiguration.ALL_ACL); + conf.setInt( + accessibleNodeLabelPrefix + "." + DATANODE_NODELABEL + "." + CapacitySchedulerConfiguration.CAPACITY, 100); + conf.setInt( + accessibleNodeLabelPrefix + "." + NAMENODE_NODELABEL + "." + CapacitySchedulerConfiguration.CAPACITY, 100); + } // This is necessary to have the RM respect our vcore allocation request - conf.set("yarn.scheduler.capacity.resource-calculator", - "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + conf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); miniYARNCluster = new MiniYARNCluster( TestDynamometerInfra.class.getName(), 1, MINICLUSTER_NUM_NMS, 1, 1); miniYARNCluster.init(conf); @@ -176,6 +202,13 @@ public static void setupClass() throws Exception { uploadFsimageResourcesToHDFS(hadoopBinVersion); miniYARNCluster.waitForNodeManagersToConnect(30000); + + RMNodeLabelsManager nodeLabelManager = miniYARNCluster.getResourceManager().getRMContext().getNodeLabelManager(); + nodeLabelManager.addToCluserNodeLabels(Sets.newHashSet(NAMENODE_NODELABEL, DATANODE_NODELABEL)); + Map> nodeLabels = new HashMap<>(); + nodeLabels.put(miniYARNCluster.getNodeManager(0).getNMContext().getNodeId(), Sets.newHashSet(NAMENODE_NODELABEL)); + nodeLabels.put(miniYARNCluster.getNodeManager(1).getNMContext().getNodeId(), Sets.newHashSet(DATANODE_NODELABEL)); + nodeLabelManager.addLabelsToNode(nodeLabels); } @AfterClass @@ -228,8 +261,10 @@ public void run() { "-" + Client.HADOOP_BINARY_PATH_ARG, hadoopTarballPath.getAbsolutePath(), "-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2", "-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128", + "-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL, "-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256", "-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1", + "-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL, "-" + AMOptions.SHELL_ENV_ARG, "HADOOP_HOME=" + getHadoopHomeLocation(), "-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop", "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, @@ -296,6 +331,15 @@ public Boolean get() { throw e; } + Map namenodeContainers = miniYARNCluster.getNodeManager(0).getNMContext().getContainers(); + Map datanodeContainers = miniYARNCluster.getNodeManager(1).getNMContext().getContainers(); + Map amContainers = miniYARNCluster.getNodeManager(2).getNMContext().getContainers(); + assertEquals(1, namenodeContainers.size()); + assertEquals(2, namenodeContainers.keySet().iterator().next().getContainerId()); + assertEquals(2, datanodeContainers.size()); + assertEquals(1, amContainers.size()); + assertEquals(1, amContainers.keySet().iterator().next().getContainerId()); + LOG.info("Waiting for workload job to start and complete"); GenericTestUtils.waitFor(new Supplier() { @Override