From 5bddd65a28d930ae6cc22785a84c92c6a26c2983 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 9 Oct 2017 13:26:23 -0700 Subject: [PATCH] Closes #4. DataNodes are launched via SimulatedDataNodes, which uses MiniDFSCluster to launch many DataNodes in the same JVM. Additionally, SimulatedFSDataset is used to avoid the requirement of writing sparse files to the disk - all block metadata is now stored in-memory. --- NOTICE | 227 +++ README.md | 7 +- build.gradle | 1 + dynamometer-infra/build.gradle | 1 + .../src/main/bash/create-slim-hadoop-tar.sh | 2 +- .../com/linkedin/dynamometer/AMOptions.java | 37 +- .../dynamometer/ApplicationMaster.java | 63 +- .../BlockPlacementPolicyAlwaysSatisfied.java | 36 + .../java/com/linkedin/dynamometer/Client.java | 8 +- .../linkedin/dynamometer/DynoConstants.java | 8 +- .../dynamometer/SimulatedDataNodes.java | 153 ++ .../SimulatedMultiStorageFSDataset.java | 1407 +++++++++++++++++ .../main/resources/scripts/create-dn-dir.sh | 141 -- .../src/main/resources/scripts/metafile | Bin 11 -> 0 bytes .../{scripts => }/start-component.sh | 87 +- .../dynamometer/TestDynamometerInfra.java | 11 +- 16 files changed, 1972 insertions(+), 217 deletions(-) create mode 100644 dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java create mode 100644 dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java create mode 100644 dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java delete mode 100644 dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh delete mode 100644 dynamometer-infra/src/main/resources/scripts/metafile rename dynamometer-infra/src/main/resources/{scripts => }/start-component.sh (80%) diff --git a/NOTICE b/NOTICE index 7c3ca52840..2171bef1f0 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,230 @@ All Rights Reserved. Licensed under the BSD 2-Clause License (the "License"). See LICENSE in the project root for license information. + +================================================================================ + +This product automatically loads third party code from an external repository +using the Gradle build system. Such third party code is subject to other license +terms than as set forth above. Please review the complete list of dependencies for +applicable license terms. + +In addition, such third party code may also depend on and load multiple tiers of +dependencies. Please review the applicable licenses of the additional dependencies. + +================================================================================ + +This product bundles a modified version of the +org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset class from +Apache Hadoop (http://hadoop.apache.org/) 2.7.4, which has the following notice: + +This product includes software developed by The Apache Software +Foundation (http://www.apache.org/). + +License: Apache 2.0 + +================================================================================ + +See below for the Apache 2.0 license in its entirety. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 6855f9f8fa..fc43db67f9 100644 --- a/README.md +++ b/README.md @@ -163,10 +163,8 @@ First launch the infrastructure application to begin the startup of the internal -conf_path my-hadoop-conf -fs_image_dir hdfs:///fsimage -block_list_path hdfs:///dyno/blocks - -datanode_layout_version -56 ``` -This demonstrates the required arguments. `-datanode_layout_version` is necessary to determine how the block -files should be laid out on the DataNodes that are launched. +This demonstrates the required arguments. You can run this with the `-help` flag to see further usage information. The client will track the Dyno-NN's startup progress and how many Dyno-DNs it considers live. It will notify via logging when the Dyno-NN has exited safemode and is ready for use. @@ -200,11 +198,10 @@ launch an integrated application with the same parameters as were used above, th -conf_path my-hadoop-conf -fs_image_dir hdfs:///fsimage -block_list_path hdfs:///dyno/blocks - -datanode_layout_version -56 -workload_replay_enable -workload_input_path hdfs:///dyno/audit_logs/ -workload_threads_per_mapper 50 -workload_start_delay 5m ``` When run in this way, the client will automatically handle tearing down the Dyno-HDFS cluster once the -workload has completed. +workload has completed. To see the full list of supported parameters, run this with the `-help` flag. diff --git a/build.gradle b/build.gradle index 60663f10dc..296ef141c2 100644 --- a/build.gradle +++ b/build.gradle @@ -23,6 +23,7 @@ ext.deps = [ 'yarn-common': "org.apache.hadoop:hadoop-yarn-common:${hadoopVersion}", 'mapreduce-client-core': "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}", minicluster: "org.apache.hadoop:hadoop-minicluster:${hadoopVersion}", + 'hdfs-test': "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}:tests", ] ] diff --git a/dynamometer-infra/build.gradle b/dynamometer-infra/build.gradle index 7c0555e157..e9eb624020 100644 --- a/dynamometer-infra/build.gradle +++ b/dynamometer-infra/build.gradle @@ -9,6 +9,7 @@ dependencies { compile deps.hadoop.'yarn-api' compile deps.hadoop.'yarn-client' compile deps.hadoop.'yarn-common' + compile deps.hadoop.'hdfs-test' compile project(':dynamometer-workload') testCompile project(path: ':dynamometer-workload', configuration: 'testArtifacts') diff --git a/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh b/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh index c089234f24..07bc0d8179 100755 --- a/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh +++ b/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh @@ -27,7 +27,7 @@ hadoopShare="$baseDir/share/hadoop" # Remove unnecessary files rm -rf ${baseDir}/share/doc ${hadoopShare}/mapreduce ${hadoopShare}/yarn \ ${hadoopShare}/kms ${hadoopShare}/tools ${hadoopShare}/httpfs \ - ${hadoopShare}/*/sources ${hadoopShare}/*/jdiff ${hadoopShare}/*/*-tests.jar + ${hadoopShare}/*/sources ${hadoopShare}/*/jdiff tar czf "$hadoopTarTmp.tar.gz" -C "$hadoopTarTmp" . rm -rf "$hadoopTarTmp" diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java index dc94b5d4c4..60545bee2e 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java @@ -8,8 +8,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; /** @@ -31,10 +33,16 @@ class AMOptions { public static final String NAMENODE_METRICS_PERIOD_ARG = "namenode_metrics_period"; public static final String NAMENODE_METRICS_PERIOD_DEFAULT = "60"; public static final String SHELL_ENV_ARG = "shell_env"; + public static final String DATANODES_PER_CLUSTER_ARG = "datanodes_per_cluster"; + public static final String DATANODES_PER_CLUSTER_DEFAULT = "1"; + public static final String DATANODE_LAUNCH_DELAY_ARG = "datanode_launch_delay"; + public static final String DATANODE_LAUNCH_DELAY_DEFAULT = "0s"; private final int datanodeMemoryMB; private final int datanodeVirtualCores; private final String datanodeArgs; + private final int datanodesPerCluster; + private final String datanodeLaunchDelay; private final int namenodeMemoryMB; private final int namenodeVirtualCores; private final String namenodeArgs; @@ -45,11 +53,13 @@ class AMOptions { private final Map shellEnv; AMOptions(int datanodeMemoryMB, int datanodeVirtualCores, String datanodeArgs, - int namenodeMemoryMB, int namenodeVirtualCores, String namenodeArgs, - int namenodeMetricsPeriod, Map shellEnv) { + int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB, int namenodeVirtualCores, + String namenodeArgs, int namenodeMetricsPeriod, Map shellEnv) { this.datanodeMemoryMB = datanodeMemoryMB; this.datanodeVirtualCores = datanodeVirtualCores; this.datanodeArgs = datanodeArgs; + this.datanodesPerCluster = datanodesPerCluster; + this.datanodeLaunchDelay = datanodeLaunchDelay; this.namenodeMemoryMB = namenodeMemoryMB; this.namenodeVirtualCores = namenodeVirtualCores; this.namenodeArgs = namenodeArgs; @@ -74,6 +84,7 @@ void verify(int maxMemory, int maxVcores) throws IllegalArgumentException { "namenodeMemoryMB (%s) must be between 0 and %s", namenodeMemoryMB, maxMemory); Preconditions.checkArgument(namenodeVirtualCores > 0 && namenodeVirtualCores <= maxVcores, "namenodeVirtualCores (%s) must be between 0 and %s", namenodeVirtualCores, maxVcores); + Preconditions.checkArgument(datanodesPerCluster > 0, "datanodesPerCluster (%s) must be > 0", datanodesPerCluster); } /** @@ -89,6 +100,8 @@ void addToVargs(List vargs) { if (!datanodeArgs.isEmpty()) { vargs.add("--" + DATANODE_ARGS_ARG + " \\\"" + datanodeArgs + "\\\""); } + vargs.add("--" + DATANODES_PER_CLUSTER_ARG + " " + String.valueOf(datanodesPerCluster)); + vargs.add("--" + DATANODE_LAUNCH_DELAY_ARG + " " + datanodeLaunchDelay); vargs.add("--" + NAMENODE_MEMORY_MB_ARG + " " + String.valueOf(namenodeMemoryMB)); vargs.add("--" + NAMENODE_VCORES_ARG + " " + String.valueOf(namenodeVirtualCores)); if (!namenodeArgs.isEmpty()) { @@ -108,6 +121,18 @@ int getDataNodeVirtualCores() { return datanodeVirtualCores; } + int getDataNodesPerCluster() { + return datanodesPerCluster; + } + + long getDataNodeLaunchDelaySec() { + // Leverage the human-readable time parsing capabilities of Configuration + String tmpConfKey = "___temp_config_property___"; + Configuration tmpConf = new Configuration(); + tmpConf.set(tmpConfKey, datanodeLaunchDelay); + return tmpConf.getTimeDuration(tmpConfKey, 0, TimeUnit.SECONDS); + } + int getNameNodeMemoryMB() { return namenodeMemoryMB; } @@ -144,6 +169,12 @@ static void setOptions(Options opts) { opts.addOption(DATANODE_VCORES_ARG, true, "Amount of virtual cores to be requested to run the DNs (default " + DATANODE_VCORES_DEFAULT + ")"); opts.addOption(DATANODE_ARGS_ARG, true, "Additional arguments to add when starting the DataNodes."); + opts.addOption(DATANODES_PER_CLUSTER_ARG, true, "How many simulated DataNodes to run within each YARN container " + + "(default " + DATANODES_PER_CLUSTER_DEFAULT + ")"); + opts.addOption(DATANODE_LAUNCH_DELAY_ARG, true, "The period over which to launch the DataNodes; this will " + + "be used as the maximum delay and each DataNode container will be launched with some random delay less than " + + "this value. Accepts human-readable time durations (e.g. 10s, 1m) (default " + + DATANODE_LAUNCH_DELAY_DEFAULT + ")"); opts.addOption("help", false, "Print usage"); } @@ -176,6 +207,8 @@ static AMOptions initFromParser(CommandLine cliParser) { Integer.parseInt(cliParser.getOptionValue(DATANODE_MEMORY_MB_ARG, DATANODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(DATANODE_VCORES_ARG, DATANODE_VCORES_DEFAULT)), cliParser.getOptionValue(DATANODE_ARGS_ARG, ""), + Integer.parseInt(cliParser.getOptionValue(DATANODES_PER_CLUSTER_ARG, DATANODES_PER_CLUSTER_DEFAULT)), + cliParser.getOptionValue(DATANODE_LAUNCH_DELAY_ARG, DATANODE_LAUNCH_DELAY_DEFAULT), Integer.parseInt(cliParser.getOptionValue(NAMENODE_MEMORY_MB_ARG, NAMENODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(NAMENODE_VCORES_ARG, NAMENODE_VCORES_DEFAULT)), cliParser.getOptionValue(NAMENODE_ARGS_ARG, ""), diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java index 44183fb57f..7fde3b1afc 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java @@ -8,6 +8,7 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -19,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -93,10 +95,8 @@ @InterfaceStability.Unstable public class ApplicationMaster { - // Location of the script used to start the DataNode/NameNode processes - private static final String START_SCRIPT_PATH = "scripts/start-component.sh"; - private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static final Random RAND = new Random(); // Configuration private Configuration conf; @@ -114,14 +114,15 @@ public class ApplicationMaster { private List blockListFiles; private int numTotalDataNodes; + private int numTotalDataNodeContainers; // Counter for completed datanodes (complete denotes successful or failed ) - private AtomicInteger numCompletedDataNodes = new AtomicInteger(); + private AtomicInteger numCompletedDataNodeContainers = new AtomicInteger(); // Allocated datanode count so that we know how many datanodes has the RM // allocated to us - private AtomicInteger numAllocatedDataNodes = new AtomicInteger(); + private AtomicInteger numAllocatedDataNodeContainers = new AtomicInteger(); // Count of failed datanodes - private AtomicInteger numFailedDataNodes = new AtomicInteger(); + private AtomicInteger numFailedDataNodeContainers = new AtomicInteger(); // True iff the application has completed and is ready for cleanup // Once true, will never be false. This variable should not be accessed @@ -319,12 +320,15 @@ public Boolean get() { markCompleted(); return false; } + numTotalDataNodeContainers = (int) Math.ceil( + ((double) numTotalDataNodes) / Math.max(1, amOptions.getDataNodesPerCluster())); - LOG.info("Requesting " + numTotalDataNodes + " DataNodes with " + amOptions.getDataNodeMemoryMB() + "MB memory, " + + LOG.info("Requesting " + numTotalDataNodeContainers + " DataNode containers with " + + amOptions.getDataNodeMemoryMB() + "MB memory, " + amOptions.getDataNodeVirtualCores() + " vcores, "); - for (int i = 0; i < numTotalDataNodes; ++i) { - ContainerRequest datanodeAsk = - setupContainerAskForRM(amOptions.getDataNodeMemoryMB(), amOptions.getDataNodeVirtualCores(), 1); + for (int i = 0; i < numTotalDataNodeContainers; ++i) { + ContainerRequest datanodeAsk = setupContainerAskForRM(amOptions.getDataNodeMemoryMB(), + amOptions.getDataNodeVirtualCores(), 1); amRMClient.addContainerRequest(datanodeAsk); LOG.debug("Requested datanode ask: " + datanodeAsk.toString()); } @@ -400,14 +404,14 @@ private boolean cleanup() { FinalApplicationStatus appStatus; String appMessage = null; boolean success; - if (numFailedDataNodes.get() == 0 && numCompletedDataNodes.get() == numTotalDataNodes) { + if (numFailedDataNodeContainers.get() == 0 && numCompletedDataNodeContainers.get() == numTotalDataNodes) { appStatus = FinalApplicationStatus.SUCCEEDED; success = true; } else { appStatus = FinalApplicationStatus.FAILED; - appMessage = "Diagnostics." + ", total=" + numTotalDataNodes - + ", completed=" + numCompletedDataNodes.get() + ", allocated=" - + numAllocatedDataNodes.get() + ", failed=" + numFailedDataNodes.get(); + appMessage = "Diagnostics." + ", total=" + numTotalDataNodeContainers + + ", completed=" + numCompletedDataNodeContainers.get() + ", allocated=" + + numAllocatedDataNodeContainers.get() + ", failed=" + numFailedDataNodeContainers.get(); success = false; } try { @@ -454,16 +458,16 @@ public void onContainersCompleted(List completedContainers) { // increment counters for completed/failed containers int exitStatus = containerStatus.getExitStatus(); - int completed = numCompletedDataNodes.incrementAndGet(); + int completed = numCompletedDataNodeContainers.incrementAndGet(); if (0 != exitStatus) { - numFailedDataNodes.incrementAndGet(); + numFailedDataNodeContainers.incrementAndGet(); } else { LOG.info("DataNode " + completed + " completed successfully, containerId=" + containerStatus.getContainerId()); } } - if (numCompletedDataNodes.get() == numTotalDataNodes) { + if (numCompletedDataNodeContainers.get() == numTotalDataNodeContainers) { LOG.info("All datanode containers completed; marking application as done"); markCompleted(); } @@ -483,14 +487,14 @@ public void onContainersAllocated(List allocatedContainers) { containerLauncher = new LaunchContainerRunnable(container, true, containerListener); } else if (rsrc.getMemory() >= amOptions.getDataNodeMemoryMB() && rsrc.getVirtualCores() >= amOptions.getDataNodeVirtualCores() - && numAllocatedDataNodes.get() < numTotalDataNodes) { + && numAllocatedDataNodeContainers.get() < numTotalDataNodes) { if (launchNameNode && namenodeContainer == null) { LOG.error("Received a container with following resources suited " + "for a DataNode but no NameNode container exists: containerMem=" + rsrc.getMemory() + ", containerVcores=" + rsrc.getVirtualCores()); continue; } - numAllocatedDataNodes.getAndIncrement(); + numAllocatedDataNodeContainers.getAndIncrement(); datanodeContainers.put(container.getId(), container); componentType = "DATANODE"; containerLauncher = new LaunchContainerRunnable(container, false, containerListener); @@ -587,8 +591,8 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { } else if (isDataNode(containerId)) { LOG.error("Failed to start DataNode Container " + containerId); datanodeContainers.remove(containerId); - numCompletedDataNodes.incrementAndGet(); - numFailedDataNodes.incrementAndGet(); + numCompletedDataNodeContainers.incrementAndGet(); + numFailedDataNodeContainers.incrementAndGet(); } else { LOG.error("onStartContainerError received unknown container ID: " + containerId); } @@ -676,15 +680,22 @@ private Map getLocalResources() throws IOException { Map envs = System.getenv(); addAsLocalResourceFromEnv(DynoConstants.CONF_ZIP, localResources, envs); - addAsLocalResourceFromEnv(DynoConstants.SCRIPTS_ZIP, localResources, envs); + addAsLocalResourceFromEnv(DynoConstants.START_SCRIPT, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.HADOOP_BINARY, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.VERSION, localResources, envs); + addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs); if (isNameNodeLauncher) { addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE_MD5, localResources, envs); } else { - addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs); - localResources.put(DynoConstants.BLOCK_LIST_RESOURCE_PATH, blockListFiles.remove(0)); + int blockFilesToLocalize = Math.max(1, amOptions.getDataNodesPerCluster()); + for (int i = 0; i < blockFilesToLocalize; i++) { + try { + localResources.put(DynoConstants.BLOCK_LIST_RESOURCE_PATH_PREFIX + i, blockListFiles.remove(0)); + } catch (IndexOutOfBoundsException e) { + break; + } + } } return localResources; } @@ -697,13 +708,15 @@ private List getContainerStartCommand() throws IOException { List vargs = new ArrayList<>(); // Set executable command - vargs.add(START_SCRIPT_PATH); + vargs.add("./" + DynoConstants.START_SCRIPT.getResourcePath()); String component = isNameNodeLauncher ? "namenode" : "datanode"; vargs.add(component); if (isNameNodeLauncher) { vargs.add(remoteStoragePath.getFileSystem(conf).makeQualified(remoteStoragePath).toString()); } else { vargs.add(namenodeServiceRpcAddress); + vargs.add(String.valueOf(amOptions.getDataNodeLaunchDelaySec() < 1 ? 0 : + RAND.nextInt(Ints.checkedCast(amOptions.getDataNodeLaunchDelaySec())))); } // Add log redirect params diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java new file mode 100644 index 0000000000..53b7b79b1f --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java @@ -0,0 +1,36 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; + + +/** + * A BlockPlacementPolicy which always considered itself satisfied. This avoids the issue that the Dynamometer + * NameNode will complain about blocks being under-replicated because they're not being put on distinct racks. + */ +public class BlockPlacementPolicyAlwaysSatisfied extends BlockPlacementPolicyDefault { + + private static final BlockPlacementStatusSatisfied SATISFIED = new BlockPlacementStatusSatisfied(); + + private static class BlockPlacementStatusSatisfied implements BlockPlacementStatus { + @Override + public boolean isPlacementPolicySatisfied() { + return true; + } + + public String getErrorDescription() { + return null; + } + } + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numberOfReplicas) { + return SATISFIED; + } + +} diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 9adf535da1..3b6c54cbd2 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -132,6 +132,9 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; + private static final String START_SCRIPT_LOCATION = + Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString(); + private YarnClient yarnClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; @@ -148,8 +151,6 @@ public class Client extends Configured implements Tool { private String hadoopBinary = ""; // Location of DN conf zip private String confPath = ""; - // Location of scripts zip - private String scriptsPath = ""; // Location of root dir for DN block image zips private String blockListPath = ""; // Location of NN fs image @@ -346,7 +347,6 @@ public boolean accept(Path path) { this.amMemory = Integer.parseInt(cliParser.getOptionValue(MASTER_MEMORY_MB_ARG, MASTER_MEMORY_MB_DEFAULT)); this.amVCores = Integer.parseInt(cliParser.getOptionValue(MASTER_VCORES_ARG, MASTER_VCORES_DEFAULT)); this.confPath = cliParser.getOptionValue(CONF_PATH_ARG); - this.scriptsPath = Client.class.getClassLoader().getResource("scripts").getPath(); this.blockListPath = cliParser.getOptionValue(BLOCK_LIST_PATH_ARG); if (cliParser.hasOption(HADOOP_BINARY_PATH_ARG)) { this.hadoopBinary = cliParser.getOptionValue(HADOOP_BINARY_PATH_ARG); @@ -511,7 +511,7 @@ private Map setupRemoteResourcesGetEnv() throws IOException { } setupRemoteResource(versionFilePath, infraAppId, DynoConstants.VERSION, env); setupRemoteResource(confPath, infraAppId, DynoConstants.CONF_ZIP, env); - setupRemoteResource(scriptsPath, infraAppId, DynoConstants.SCRIPTS_ZIP, env); + setupRemoteResource(START_SCRIPT_LOCATION, infraAppId, DynoConstants.START_SCRIPT, env); setupRemoteResource(hadoopBinary, infraAppId, DynoConstants.HADOOP_BINARY, env); setupRemoteResource(appMasterJar, infraAppId, DynoConstants.DYNO_JAR, env); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java index d5e9a85ae7..e4d82073d7 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java @@ -31,8 +31,8 @@ public class DynoConstants { public static final DynoResource CONF_ZIP = new DynoResource("CONF_ZIP", ARCHIVE, "conf"); // Resource for the Hadoop binary archive (distribution tar) public static final DynoResource HADOOP_BINARY = new DynoResource("HADOOP_BINARY", ARCHIVE, "hadoopBinary"); - // Resource for the zip file for the scripts used by the DataNodes/NameNode - public static final DynoResource SCRIPTS_ZIP = new DynoResource("SCRIPTS_ZIP", ARCHIVE, "scripts"); + // Resource for the script used to start the DataNodes/NameNode + public static final DynoResource START_SCRIPT = new DynoResource("START_SCRIPT", FILE, "start-component.sh"); // Resource for the file system image file used by the NameNode public static final DynoResource FS_IMAGE = new DynoResource("FS_IMAGE", FILE, null); // Resource for the md5 file accompanying the file system image for the NameNode @@ -47,8 +47,8 @@ public class DynoConstants { public static final String BLOCK_LIST_PATH_ENV = "BLOCK_ZIP_PATH"; // The format of the name of a single block file public static final Pattern BLOCK_LIST_FILE_PATTERN = Pattern.compile("dn[0-9]+-a-[0-9]+-r-[0-9]+"); - // The file name to use when localizing the block file on a DataNode - public static final String BLOCK_LIST_RESOURCE_PATH = "block"; + // The file name to use when localizing the block file on a DataNode; will be suffixed with an integer + public static final String BLOCK_LIST_RESOURCE_PATH_PREFIX = "blocks/block"; public static final PathFilter BLOCK_LIST_FILE_FILTER = new PathFilter() { @Override public boolean accept(Path path) { diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java new file mode 100644 index 0000000000..37a7b2ede6 --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java @@ -0,0 +1,153 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + + +/** + * Starts up a number of DataNodes within the same JVM. These DataNodes all use + * {@link org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset}, so they do not store any actual data, and do not + * persist anything to disk; they maintain all metadata in memory. This is useful for testing and simulation purposes. + *

+ * The DataNodes will attempt to connect to a NameNode defined by the default FileSystem. There will be one DataNode + * started for each block list file passed as an argument. Each of these files should contain a list of blocks that + * the corresponding DataNode should contain, as specified by a triplet of block ID, block size, and generation stamp. + * Each line of the file is one block, in the format: + *

+ * {@code blockID,blockGenStamp,blockSize} + *

+ * This class is loosely based off of {@link org.apache.hadoop.hdfs.DataNodeCluster}. + */ +public class SimulatedDataNodes extends Configured implements Tool { + + // Set this arbitrarily large (100TB) since we don't care about storage capacity + private static final long STORAGE_CAPACITY = 100 * 2L<<40; + private static final String USAGE = + "Usage: com.linkedin.dynamometer.SimulatedDataNodes bpid blockListFile1 [ blockListFileN ... ]\n" + + " bpid should be the ID of the block pool to which these DataNodes belong.\n" + + " Each blockListFile specified should contain a list of blocks to be served by one DataNode.\n" + + " See the Javadoc of this class for more detail."; + + static void printUsageExit(String err) { + System.out.println(err); + System.out.println(USAGE); + System.exit(1); + } + + public static void main(String[] args) throws Exception { + SimulatedDataNodes datanodes = new SimulatedDataNodes(); + ToolRunner.run(new HdfsConfiguration(), datanodes, args); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + printUsageExit("Not enough arguments"); + } + String bpid = args[0]; + List blockListFiles = new ArrayList<>(); + for (int i = 1; i < args.length; i++) { + blockListFiles.add(new Path(args[i])); + } + + URI defaultFS = FileSystem.getDefaultUri(getConf()); + if (!HdfsConstants.HDFS_URI_SCHEME.equals(defaultFS.getScheme())) { + printUsageExit("Must specify an HDFS-based default FS! Got <" + defaultFS + ">"); + } + String nameNodeAdr = defaultFS.getAuthority(); + if (nameNodeAdr == null) { + printUsageExit("No NameNode address and port in config"); + } + System.out.println("DataNodes will connect to NameNode at " + nameNodeAdr); + + System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, + DataNode.getStorageLocations(getConf()).get(0).getFile().getAbsolutePath()); + SimulatedMultiStorageFSDataset.setFactory(getConf()); + getConf().setLong(SimulatedMultiStorageFSDataset.CONFIG_PROPERTY_CAPACITY, STORAGE_CAPACITY); + + UserGroupInformation.setConfiguration(getConf()); + MiniDFSCluster mc = new MiniDFSCluster(); + try { + mc.formatDataNodeDirs(); + } catch (IOException e) { + System.out.println("Error formatting DataNode dirs: " + e); + System.exit(1); + } + + try { + System.out.println("Found " + blockListFiles.size() + " block listing files; launching DataNodes accordingly."); + mc.startDataNodes(getConf(), blockListFiles.size(), null, false, StartupOption.REGULAR, + null, null, null, null, false, true, true, null); + long startTime = Time.monotonicNow(); + System.out.println("Waiting for DataNodes to connect to NameNode and init storage directories."); + Set datanodesWithoutFSDataset = new HashSet<>(mc.getDataNodes()); + while (!datanodesWithoutFSDataset.isEmpty()) { + Iterator iter = datanodesWithoutFSDataset.iterator(); + while (iter.hasNext()) { + if (DataNodeTestUtils.getFSDataset(iter.next()) != null) { + iter.remove(); + } + } + Thread.sleep(100); + } + System.out.println("Waited " + (Time.monotonicNow() - startTime) + " ms for DataNode FSDatasets to be ready"); + + for (int dnIndex = 0; dnIndex < blockListFiles.size(); dnIndex++) { + Path blockListFile = blockListFiles.get(dnIndex); + try (FSDataInputStream fsdis = blockListFile.getFileSystem(getConf()).open(blockListFile)) { + BufferedReader reader = new BufferedReader(new InputStreamReader(fsdis)); + List blockList = new ArrayList<>(); + int cnt = 0; + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + // Format of the listing files is blockID,blockGenStamp,blockSize + String[] blockInfo = line.split(","); + blockList.add(new Block(Long.parseLong(blockInfo[0]), + Long.parseLong(blockInfo[2]), Long.parseLong(blockInfo[1]))); + cnt++; + } + try { + mc.injectBlocks(dnIndex, blockList, bpid); + } catch (IOException ioe) { + System.out.printf("Error injecting blocks into DataNode %d for block pool %s: %s%n", dnIndex, bpid, + ExceptionUtils.getFullStackTrace(ioe)); + } + System.out.printf("Injected %d blocks into DataNode %d for block pool %s%n", cnt, dnIndex, bpid); + } + } + + } catch (IOException e) { + System.out.println("Error creating DataNodes: " + ExceptionUtils.getFullStackTrace(e)); + return 1; + } + return 0; + } + +} diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java new file mode 100644 index 0000000000..54cc38bf86 --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java @@ -0,0 +1,1407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.dynamometer; + +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.DataChecksum; + + +/** + *

+ * This is a modified version of {@link SimulatedFSDataset} from Hadoop branch-2, because previous versions + * did not support having multiple storages per DataNode, which is very important for obtaining correct + * full block report performance (as there is one report per storage). It is essentially the branch-2 + * {@link SimulatedFSDataset} with HDFS-12818 additionally applied on top of it. + * + * This file is a modified version of the {@link SimulatedFSDataset} class, taken from + * Apache Hadoop 2.7.4. It was originally developed by + * The Apache Software Foundation. + */ +public class SimulatedMultiStorageFSDataset extends SimulatedFSDataset { + static class Factory extends FsDatasetSpi.Factory { + @Override + public SimulatedMultiStorageFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new SimulatedMultiStorageFSDataset(datanode, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + + public static void setFactory(Configuration conf) { + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + Factory.class.getName()); + } + + public static final String CONFIG_PROPERTY_CAPACITY = + "dfs.datanode.simulateddatastorage.capacity"; + + public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte + public static final byte DEFAULT_DATABYTE = 9; + + public static final String CONFIG_PROPERTY_STATE = + "dfs.datanode.simulateddatastorage.state"; + private static final DatanodeStorage.State DEFAULT_STATE = + DatanodeStorage.State.NORMAL; + + static final byte[] nullCrcFileData; + static { + DataChecksum checksum = DataChecksum.newDataChecksum( + DataChecksum.Type.NULL, 16*1024 ); + byte[] nullCrcHeader = checksum.getHeader(); + nullCrcFileData = new byte[2 + nullCrcHeader.length]; + nullCrcFileData[0] = (byte) ((BlockMetadataHeader.VERSION >>> 8) & 0xff); + nullCrcFileData[1] = (byte) (BlockMetadataHeader.VERSION & 0xff); + for (int i = 0; i < nullCrcHeader.length; i++) { + nullCrcFileData[i+2] = nullCrcHeader[i]; + } + } + + // information about a single block + private class BInfo implements ReplicaInPipelineInterface { + final Block theBlock; + private boolean finalized = false; // if not finalized => ongoing creation + SimulatedOutputStream oStream = null; + private long bytesAcked; + private long bytesRcvd; + private boolean pinned = false; + BInfo(String bpid, Block b, boolean forWriting) throws IOException { + theBlock = new Block(b); + if (theBlock.getNumBytes() < 0) { + theBlock.setNumBytes(0); + } + if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) { + // expected length - actual length may + // be more - we find out at finalize + DataNode.LOG.warn("Lack of free storage on a block alloc"); + throw new IOException("Creating block, no free space available"); + } + + if (forWriting) { + finalized = false; + oStream = new SimulatedOutputStream(); + } else { + finalized = true; + oStream = null; + } + } + + @Override + public String getStorageUuid() { + return getStorage(theBlock).getStorageUuid(); + } + + @Override + synchronized public long getGenerationStamp() { + return theBlock.getGenerationStamp(); + } + + @Override + synchronized public long getNumBytes() { + if (!finalized) { + return bytesRcvd; + } else { + return theBlock.getNumBytes(); + } + } + + @Override + synchronized public void setNumBytes(long length) { + if (!finalized) { + bytesRcvd = length; + } else { + theBlock.setNumBytes(length); + } + } + + synchronized SimulatedInputStream getIStream() { + if (!finalized) { + // throw new IOException("Trying to read an unfinalized block"); + return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE); + } else { + return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE); + } + } + + synchronized void finalizeBlock(String bpid, long finalSize) + throws IOException { + if (finalized) { + throw new IOException( + "Finalizing a block that has already been finalized" + + theBlock.getBlockId()); + } + if (oStream == null) { + DataNode.LOG.error("Null oStream on unfinalized block - bug"); + throw new IOException("Unexpected error on finalize"); + } + + if (oStream.getLength() != finalSize) { + DataNode.LOG.warn("Size passed to finalize (" + finalSize + + ")does not match what was written:" + oStream.getLength()); + throw new IOException( + "Size passed to finalize does not match the amount of data written"); + } + // We had allocated the expected length when block was created; + // adjust if necessary + long extraLen = finalSize - theBlock.getNumBytes(); + if (extraLen > 0) { + if (!getStorage(theBlock).alloc(bpid,extraLen)) { + DataNode.LOG.warn("Lack of free storage on a block alloc"); + throw new IOException("Creating block, no free space available"); + } + } else { + getStorage(theBlock).free(bpid, -extraLen); + } + theBlock.setNumBytes(finalSize); + + finalized = true; + oStream = null; + return; + } + + synchronized void unfinalizeBlock() throws IOException { + if (!finalized) { + throw new IOException("Unfinalized a block that's not finalized " + + theBlock); + } + finalized = false; + oStream = new SimulatedOutputStream(); + long blockLen = theBlock.getNumBytes(); + oStream.setLength(blockLen); + bytesRcvd = blockLen; + bytesAcked = blockLen; + } + + SimulatedInputStream getMetaIStream() { + return new SimulatedInputStream(nullCrcFileData); + } + + synchronized boolean isFinalized() { + return finalized; + } + + @Override + synchronized public ReplicaOutputStreams createStreams(boolean isCreate, + DataChecksum requestedChecksum) throws IOException { + if (finalized) { + throw new IOException("Trying to write to a finalized replica " + + theBlock); + } else { + SimulatedOutputStream crcStream = new SimulatedOutputStream(); + return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, + getStorage(theBlock).getVolume().isTransientStorage()); + } + } + + @Override + synchronized public long getBlockId() { + return theBlock.getBlockId(); + } + + @Override + synchronized public long getVisibleLength() { + return getBytesAcked(); + } + + @Override + public ReplicaState getState() { + return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW; + } + + @Override + synchronized public long getBytesAcked() { + if (finalized) { + return theBlock.getNumBytes(); + } else { + return bytesAcked; + } + } + + @Override + synchronized public void setBytesAcked(long bytesAcked) { + if (!finalized) { + this.bytesAcked = bytesAcked; + } + } + + @Override + public void releaseAllBytesReserved() { + } + + @Override + synchronized public long getBytesOnDisk() { + if (finalized) { + return theBlock.getNumBytes(); + } else { + return oStream.getLength(); + } + } + + @Override + public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { + oStream.setLength(dataLength); + } + + @Override + public ChunkChecksum getLastChecksumAndDataLen() { + return new ChunkChecksum(oStream.getLength(), null); + } + + @Override + public boolean isOnTransientStorage() { + return false; + } + } + + /** + * Class is used for tracking block pool storage + */ + private static class SimulatedBPStorage { + private long used; // in bytes + private final Map blockMap = new HashMap<>(); + + long getUsed() { + return used; + } + + void alloc(long amount) { + used += amount; + } + + void free(long amount) { + used -= amount; + } + + Map getBlockMap() { + return blockMap; + } + + SimulatedBPStorage() { + used = 0; + } + } + + /** + * Class used for tracking datanode level storage + */ + private static class SimulatedStorage { + private final Map map = + new HashMap(); + + private final long capacity; // in bytes + private final DatanodeStorage dnStorage; + private final SimulatedVolume volume; + + synchronized long getFree() { + return capacity - getUsed(); + } + + long getCapacity() { + return capacity; + } + + synchronized long getUsed() { + long used = 0; + for (SimulatedBPStorage bpStorage : map.values()) { + used += bpStorage.getUsed(); + } + return used; + } + + synchronized long getBlockPoolUsed(String bpid) throws IOException { + return getBPStorage(bpid).getUsed(); + } + + int getNumFailedVolumes() { + return 0; + } + + synchronized boolean alloc(String bpid, long amount) throws IOException { + if (getFree() >= amount) { + getBPStorage(bpid).alloc(amount); + return true; + } + return false; + } + + synchronized void free(String bpid, long amount) throws IOException { + getBPStorage(bpid).free(amount); + } + + SimulatedStorage(long cap, DatanodeStorage.State state) { + capacity = cap; + dnStorage = new DatanodeStorage( + "SimulatedStorage-" + DatanodeStorage.generateUuid(), + state, StorageType.DEFAULT); + this.volume = new SimulatedVolume(this); + } + + synchronized void addBlockPool(String bpid) { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage != null) { + return; + } + map.put(bpid, new SimulatedBPStorage()); + } + + synchronized void removeBlockPool(String bpid) { + map.remove(bpid); + } + + private SimulatedBPStorage getBPStorage(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("block pool " + bpid + " not found"); + } + return bpStorage; + } + + String getStorageUuid() { + return dnStorage.getStorageID(); + } + + DatanodeStorage getDnStorage() { + return dnStorage; + } + + synchronized StorageReport getStorageReport(String bpid) { + return new StorageReport(dnStorage, + false, getCapacity(), getUsed(), getFree(), + map.get(bpid).getUsed(), 0L); + } + + SimulatedVolume getVolume() { + return volume; + } + + Map getBlockMap(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("Nonexistent block pool: " + bpid); + } + return bpStorage.getBlockMap(); + } + } + + static class SimulatedVolume implements FsVolumeSpi { + private final SimulatedStorage storage; + + SimulatedVolume(final SimulatedStorage storage) { + this.storage = storage; + } + + @Override + public FsVolumeReference obtainReference() throws ClosedChannelException { + return null; + } + + @Override + public String getStorageID() { + return storage.getStorageUuid(); + } + + @Override + public String[] getBlockPoolList() { + return new String[0]; + } + + @Override + public long getAvailable() throws IOException { + return storage.getCapacity() - storage.getUsed(); + } + + @Override + public String getBasePath() { + return null; + } + + @Override + public String getPath(String bpid) throws IOException { + return null; + } + + @Override + public File getFinalizedDir(String bpid) throws IOException { + return null; + } + + @Override + public StorageType getStorageType() { + return null; + } + + @Override + public boolean isTransientStorage() { + return false; + } + + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FsDatasetSpi getDataset() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + } + + private final List storages; + private final String datanodeUuid; + private final DataNode datanode; + + public SimulatedMultiStorageFSDataset(DataNode datanode, Configuration conf) { + super(datanode, null, conf); + this.datanode = datanode; + int storageCount = DataNode.getStorageLocations(conf).size(); + this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid(); + + this.storages = new ArrayList<>(); + for (int i = 0; i < storageCount; i++) { + this.storages.add(new SimulatedStorage( + conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), + conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE))); + } + } + + public synchronized void injectBlocks(String bpid, + Iterable injectBlocks) throws IOException { + ExtendedBlock blk = new ExtendedBlock(); + if (injectBlocks != null) { + for (Block b: injectBlocks) { // if any blocks in list is bad, reject list + if (b == null) { + throw new NullPointerException("Null blocks in block list"); + } + blk.set(bpid, b); + if (isValidBlock(blk)) { + DataNode.LOG.error("Block already exists in block list; trying to add <" + + b + "> but already have <" + getBlockMap(blk).get(blk.getLocalBlock()).theBlock + ">; skipping"); + } + } + + List> blockMaps = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); + blockMaps.add(storage.getBlockMap(bpid)); + } + + for (Block b: injectBlocks) { + BInfo binfo = new BInfo(bpid, b, false); + blockMaps.get((int) (b.getBlockId() % storages.size())).put(binfo.theBlock, binfo); + } + } + } + + /** Get the storage that a given block lives within. */ + private SimulatedStorage getStorage(Block b) { + return storages.get((int) (b.getBlockId() % storages.size())); + } + + /** + * Get the block map that a given block lives within, assuming it is within + * block pool bpid. + */ + private Map getBlockMap(Block b, String bpid) + throws IOException { + return getStorage(b).getBlockMap(bpid); + } + + /** Get the block map that a given block lives within. */ + private Map getBlockMap(ExtendedBlock b) throws IOException { + return getBlockMap(b.getLocalBlock(), b.getBlockPoolId()); + } + + @Override // FsDatasetSpi + public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("Finalizing a non existing block " + b); + } + binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); + } + + @Override // FsDatasetSpi + public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{ + if (isValidRbw(b)) { + getBlockMap(b).remove(b.getLocalBlock()); + } + } + + synchronized BlockListAsLongs getBlockReport(String bpid, + SimulatedStorage storage) { + BlockListAsLongs.Builder report = BlockListAsLongs.builder(); + try { + for (BInfo b : storage.getBlockMap(bpid).values()) { + if (b.isFinalized()) { + report.add(b); + } + } + } catch (IOException ioe) { + // Ignore + } + return report.build(); + } + + @Override + public synchronized Map getBlockReports( + String bpid) { + Map blockReports = new HashMap<>(); + for (SimulatedStorage storage : storages) { + blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage)); + } + return blockReports; + } + + @Override // FsDatasetSpi + public List getCacheReport(String bpid) { + return new LinkedList(); + } + + @Override // FSDatasetMBean + public long getCapacity() { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getCapacity(); + } + return total; + } + + @Override // FSDatasetMBean + public long getDfsUsed() { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getUsed(); + } + return total; + } + + @Override // FSDatasetMBean + public long getBlockPoolUsed(String bpid) throws IOException { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getBlockPoolUsed(bpid); + } + return total; + } + + @Override // FSDatasetMBean + public long getRemaining() { + + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getFree(); + } + return total; + } + + @Override // FSDatasetMBean + public int getNumFailedVolumes() { + + int total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getNumFailedVolumes(); + } + return total; + } + + @Override // FSDatasetMBean + public String[] getFailedStorageLocations() { + return null; + } + + @Override // FSDatasetMBean + public long getLastVolumeFailureDate() { + return 0; + } + + @Override // FSDatasetMBean + public long getEstimatedCapacityLostTotal() { + return 0; + } + + @Override // FsDatasetSpi + public VolumeFailureSummary getVolumeFailureSummary() { + return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0); + } + + @Override // FSDatasetMBean + public long getCacheUsed() { + return 0l; + } + + @Override // FSDatasetMBean + public long getCacheCapacity() { + return 0l; + } + + @Override // FSDatasetMBean + public long getNumBlocksCached() { + return 0l; + } + + @Override + public long getNumBlocksFailedToCache() { + return 0l; + } + + @Override + public long getNumBlocksFailedToUncache() { + return 0l; + } + + @Override // FsDatasetSpi + public synchronized long getLength(ExtendedBlock b) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("Finalizing a non existing block " + b); + } + return binfo.getNumBytes(); + } + + @Override + @Deprecated + public Replica getReplica(String bpid, long blockId) { + Block b = new Block(blockId); + try { + return getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + return null; + } + } + + @Override + public synchronized String getReplicaString(String bpid, long blockId) { + Replica r = null; + try { + Block b = new Block(blockId); + r = getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + // Ignore + } + return r == null? "null": r.toString(); + } + + @Override // FsDatasetSpi + public Block getStoredBlock(String bpid, long blkid) throws IOException { + Block b = new Block(blkid); + try { + BInfo binfo = getBlockMap(b, bpid).get(b); + if (binfo == null) { + return null; + } + return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); + } catch (IOException ioe) { + return null; + } + } + + @Override // FsDatasetSpi + public synchronized void invalidate(String bpid, Block[] invalidBlks) + throws IOException { + boolean error = false; + if (invalidBlks == null) { + return; + } + for (Block b: invalidBlks) { + if (b == null) { + continue; + } + Map map = getBlockMap(b, bpid); + BInfo binfo = map.get(b); + if (binfo == null) { + error = true; + DataNode.LOG.warn("Invalidate: Missing block"); + continue; + } + getStorage(b).free(bpid, binfo.getNumBytes()); + map.remove(b); + if (datanode != null) { + datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b), + binfo.getStorageUuid()); + } + } + if (error) { + throw new IOException("Invalidate: Missing blocks."); + } + } + + @Override // FSDatasetSpi + public void cache(String bpid, long[] cacheBlks) { + throw new UnsupportedOperationException( + "SimulatedMultiStorageFSDataset does not support cache operation!"); + } + + @Override // FSDatasetSpi + public void uncache(String bpid, long[] uncacheBlks) { + throw new UnsupportedOperationException( + "SimulatedMultiStorageFSDataset does not support uncache operation!"); + } + + @Override // FSDatasetSpi + public boolean isCached(String bpid, long blockId) { + return false; + } + + private BInfo getBInfo(final ExtendedBlock b) { + try { + return getBlockMap(b).get(b.getLocalBlock()); + } catch (IOException ioe) { + return null; + } + } + + @Override // {@link FsDatasetSpi} + public boolean contains(ExtendedBlock block) { + return getBInfo(block) != null; + } + + /** + * Check if a block is valid. + * + * @param b The block to check. + * @param minLength The minimum length that the block must have. May be 0. + * @param state If this is null, it is ignored. If it is non-null, we + * will check that the replica has this state. + * + * @throws ReplicaNotFoundException If the replica is not found + * + * @throws UnexpectedReplicaStateException If the replica is not in the + * expected state. + */ + @Override // {@link FsDatasetSpi} + public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) + throws ReplicaNotFoundException, UnexpectedReplicaStateException { + final BInfo binfo = getBInfo(b); + + if (binfo == null) { + throw new ReplicaNotFoundException(b); + } + if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) || + (state != ReplicaState.FINALIZED && binfo.isFinalized())) { + throw new UnexpectedReplicaStateException(b,state); + } + } + + @Override // FsDatasetSpi + public synchronized boolean isValidBlock(ExtendedBlock b) { + try { + checkBlock(b, 0, ReplicaState.FINALIZED); + } catch (IOException e) { + return false; + } + return true; + } + + /* check if a block is created but not finalized */ + @Override + public synchronized boolean isValidRbw(ExtendedBlock b) { + try { + checkBlock(b, 0, ReplicaState.RBW); + } catch (IOException e) { + return false; + } + return true; + } + + @Override + public String toString() { + return getStorageInfo(); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler append( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null || !binfo.isFinalized()) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + binfo.unfinalizeBlock(); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler recoverAppend( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if (binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + if (binfo.isFinalized()) { + binfo.unfinalizeBlock(); + } + map.remove(b); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if (binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + if (!binfo.isFinalized()) { + binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes()); + } + map.remove(b.getLocalBlock()); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return binfo; + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler recoverRbw( + ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) + throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if ( binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " does not exist, and cannot be appended to."); + } + if (binfo.isFinalized()) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } + map.remove(b); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler createRbw( + StorageType storageType, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { + return createTemporary(storageType, b, false); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler createTemporary( + StorageType storageType, ExtendedBlock b, boolean isTransfer) throws IOException { + if (isValidBlock(b)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } + if (isValidRbw(b)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is being written, and cannot be written to."); + } + BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); + getBlockMap(b).put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + synchronized InputStream getBlockInputStream(ExtendedBlock b + ) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + + return binfo.getIStream(); + } + + @Override // FsDatasetSpi + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + InputStream result = getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return result; + } + + /** Not supported */ + @Override // FsDatasetSpi + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException { + throw new IOException("Not supported"); + } + + @Override // FsDatasetSpi + public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b + ) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + if (!binfo.finalized) { + throw new IOException("Block " + b + + " is being written, its meta cannot be read"); + } + final SimulatedInputStream sin = binfo.getMetaIStream(); + return new LengthInputStream(sin, sin.getLength()); + } + + @Override + public Set checkDataDir() { + // nothing to check for simulated data set + return null; + } + + @Override // FsDatasetSpi + public synchronized void adjustCrcChannelPosition(ExtendedBlock b, + ReplicaOutputStreams stream, + int checksumSize) + throws IOException { + } + + /** + * Simulated input and output streams + * + */ + static private class SimulatedInputStream extends java.io.InputStream { + + + byte theRepeatedData = 7; + final long length; // bytes + int currentPos = 0; + byte[] data = null; + + /** + * An input stream of size l with repeated bytes + * @param l size of the stream + * @param iRepeatedData byte that is repeated in the stream + */ + SimulatedInputStream(long l, byte iRepeatedData) { + length = l; + theRepeatedData = iRepeatedData; + } + + /** + * An input stream of of the supplied data + * @param iData data to construct the stream + */ + SimulatedInputStream(byte[] iData) { + data = iData; + length = data.length; + } + + /** + * @return the lenght of the input stream + */ + long getLength() { + return length; + } + + @Override + public int read() throws IOException { + if (currentPos >= length) + return -1; + if (data !=null) { + return data[currentPos++]; + } else { + currentPos++; + return theRepeatedData; + } + } + + @Override + public int read(byte[] b) throws IOException { + + if (b == null) { + throw new NullPointerException(); + } + if (b.length == 0) { + return 0; + } + if (currentPos >= length) { // EOF + return -1; + } + int bytesRead = (int) Math.min(b.length, length-currentPos); + if (data != null) { + System.arraycopy(data, currentPos, b, 0, bytesRead); + } else { // all data is zero + for (int i : b) { + b[i] = theRepeatedData; + } + } + currentPos += bytesRead; + return bytesRead; + } + } + + /** + * This class implements an output stream that merely throws its data away, but records its + * length. + * + */ + static private class SimulatedOutputStream extends OutputStream { + long length = 0; + + /** + * constructor for Simulated Output Steram + */ + SimulatedOutputStream() { + } + + /** + * + * @return the length of the data created so far. + */ + long getLength() { + return length; + } + + /** + */ + void setLength(long length) { + this.length = length; + } + + @Override + public void write(int arg0) throws IOException { + length++; + } + + @Override + public void write(byte[] b) throws IOException { + length += b.length; + } + + @Override + public void write(byte[] b, + int off, + int len) throws IOException { + length += len; + } + } + + private ObjectName mbeanName; + + + + /** + * Register the FSDataset MBean using the name + * "hadoop:service=DataNode,name=FSDatasetState-" + * We use storage id for MBean name since a minicluster within a single + * Java VM may have multiple Simulated Datanodes. + */ + void registerMBean(final String storageId) { + // We wrap to bypass standard mbean naming convetion. + // This wraping can be removed in java 6 as it is more flexible in + // package naming for mbeans and their impl. + StandardMBean bean; + + try { + bean = new StandardMBean(this,FSDatasetMBean.class); + mbeanName = MBeans.register("DataNode", "FSDatasetState-"+ + storageId, bean); + } catch (NotCompliantMBeanException e) { + DataNode.LOG.warn("Error registering FSDatasetState MBean", e); + } + + DataNode.LOG.info("Registered FSDatasetState MBean"); + } + + @Override + public void shutdown() { + if (mbeanName != null) MBeans.unregister(mbeanName); + } + + @Override + public String getStorageInfo() { + return "Simulated FSDataset-" + datanodeUuid; + } + + @Override + public boolean hasEnoughResource() { + return true; + } + + @Override + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) + throws IOException { + ExtendedBlock b = rBlock.getBlock(); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + + return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), + binfo.getGenerationStamp(), + binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); + } + + @Override // FsDatasetSpi + public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, + long newBlockId, + long newlength) throws IOException { + return getBInfo(oldBlock); + } + + @Override // FsDatasetSpi + public long getReplicaVisibleLength(ExtendedBlock block) { + return block.getNumBytes(); + } + + @Override // FsDatasetSpi + public void addBlockPool(String bpid, Configuration conf) { + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); + } + } + + @Override // FsDatasetSpi + public void shutdownBlockPool(String bpid) { + for (SimulatedStorage storage : storages) { + storage.removeBlockPool(bpid); + } + } + + @Override // FsDatasetSpi + public void deleteBlockPool(String bpid, boolean force) { + return; + } + + @Override + public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) + throws IOException { + final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock()); + if (r == null) { + throw new IOException("Block not found, temporary=" + temporary); + } else if (r.isFinalized()) { + throw new IOException("Replica already finalized, temporary=" + + temporary + ", r=" + r); + } + return r; + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + throw new UnsupportedOperationException(); + } + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void enableTrash(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearTrash(String bpid) { + } + + @Override + public boolean trashEnabled(String bpid) { + return false; + } + + @Override + public void setRollingUpgradeMarker(String bpid) { + } + + @Override + public void clearRollingUpgradeMarker(String bpid) { + } + + @Override + public void checkAndUpdate(String bpid, long blockId, File diskFile, + File diskMetaFile, FsVolumeSpi vol) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List getVolumes() { + throw new UnsupportedOperationException(); + } + + @Override + public void addVolume( + final StorageLocation location, + final List nsInfos) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public DatanodeStorage getStorage(final String storageUuid) { + for (SimulatedStorage storage : storages) { + if (storageUuid.equals(storage.getStorageUuid())) { + return storage.getDnStorage(); + } + } + return null; + } + + @Override + public StorageReport[] getStorageReports(String bpid) { + List reports = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + reports.add(storage.getStorageReport(bpid)); + } + return reports.toArray(new StorageReport[0]); + } + + @Override + public List getFinalizedBlocks(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public List getFinalizedBlocksOnPersistentStorage(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public Map getVolumeInfoMap() { + throw new UnsupportedOperationException(); + } + + @Override + public FsVolumeSpi getVolume(ExtendedBlock b) { + return getStorage(b.getLocalBlock()).getVolume(); + } + + @Override + public synchronized void removeVolumes(Set volumes, boolean clearFailure) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, + FileDescriptor fd, long offset, long nbytes, int flags) { + throw new UnsupportedOperationException(); + } + + @Override + public void onCompleteLazyPersist(String bpId, long blockId, + long creationTime, File[] savedFiles, FsVolumeSpi targetVolume) { + throw new UnsupportedOperationException(); + } + + @Override + public void onFailLazyPersist(String bpId, long blockId) { + throw new UnsupportedOperationException(); + } + + @Override + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setPinning(ExtendedBlock b) throws IOException { + getBlockMap(b).get(b.getLocalBlock()).pinned = true; + } + + @Override + public boolean getPinning(ExtendedBlock b) throws IOException { + return getBlockMap(b).get(b.getLocalBlock()).pinned; + } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + throw new UnsupportedOperationException(); + } +} + + diff --git a/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh b/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh deleted file mode 100644 index 651e414c44..0000000000 --- a/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env bash -# -# Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. -# See LICENSE in the project root for license information. -# - -# Script for creating Dynamometer DataNode directories within YARN container. -# Usage: ./create-dn-dir.sh data_dir_list -# data_dir_list is a comma-separated list of data directories to populate - -dataDirs="$1" -# Strip off file:// prefix if present; convert to array -dataDirs=${dataDirs//file:\/\//} -dataDirs=(${dataDirs//,/ }) - -layoutVersion=$(${HADOOP_HOME}/bin/hadoop jar dynamometer.jar \ - com.linkedin.dynamometer.DataNodeLayoutVersionFetcher) -if [ "$layoutVersion" = -56 ]; then - layoutDirectoryMax=255 -elif [ "$layoutVersion" = -57 ]; then - layoutDirectoryMax=31 -else - echo "Invalid/unsupported layout version: $layoutVersion" - exit 1 -fi - -# We call this periodically so that if the application is killed before this completes, -# the script will exit itself rather than continuing to create new blocks -parentPID=$PPID -function exitIfParentIsDead() { - if ! kill -0 ${parentPID} 2>/dev/null; then - echo "Parent process died; exiting from ${0##*/}" - exit 1 - fi -} - -set -m - -metafile=`pwd`/scripts/metafile -versionFile=`pwd`/VERSION - -bpId=`cat ${versionFile} | grep blockpoolID | awk -F\= '{print $2}'` -clusterId=`cat ${versionFile} | grep clusterID | awk -F\= '{print $2}'` -namespaceID=`cat ${versionFile} | grep namespaceID | awk -F\= '{print $2}'` - -chmod 644 ${metafile} - -dnUUID=$(uuidgen) - -for dataDir in ${dataDirs[@]}; do - mkdir -p "$dataDir/current" - cat > "$dataDir/current/VERSION" << EOF -clusterID=${clusterId} -namespaceID=${namespaceID} -cTime=0 -blockpoolID=${bpId} -datanodeUuid=${dnUUID} -storageID=DS-$(uuidgen) -storageType=DATA_NODE -layoutVersion=${layoutVersion} -EOF - bpDir="$dataDir/current/$bpId/current" - mkdir -p ${bpDir} - mkdir ${bpDir}/rbw - cp "$dataDir/current/VERSION" ${bpDir}/VERSION - finDir=${bpDir}/finalized - mkdir ${finDir} - - for p in `seq 0 ${layoutDirectoryMax}`; do - mkdir_args=() - for q in `seq 0 ${layoutDirectoryMax}`; do - mkdir_args[$((q+1))]="${finDir}/subdir${p}/subdir${q}" - done - mkdir -p "${mkdir_args[@]}" - exitIfParentIsDead - done -done - -exitIfParentIsDead - -while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done - -exitIfParentIsDead - -blkList=`pwd`/block -blkIdx=0 - -blkListSplitDir="`pwd`/blkListSplit" -mkdir -p "$blkListSplitDir" - -max_parallelism=100 -lines_per_split=5000 -blkCnt=`wc -l "$blkList" | awk '{ print $1 }'` -if [ ${blkCnt} -gt $((max_parallelism * lines_per_split)) ]; then - lines_per_split=$((blkCnt / max_parallelism)) -fi - -echo "Beginning to create $blkCnt blocks using $lines_per_split blocks per process" - -split -a 3 -l "$lines_per_split" "$blkList" "$blkListSplitDir/blkList" - -function read_split() { - splitFile="$1" - blkIdx=0 - - while read blkInfo; do - dataDirIdx=$((blkIdx % ${#dataDirs[@]})) - dataDir=${dataDirs[$dataDirIdx]}/current/${bpId}/current/finalized - - blkInfoArray=(${blkInfo//,/ }) - blkId=${blkInfoArray[0]} - blkGs=${blkInfoArray[1]} - blkSz=${blkInfoArray[2]} - - d1=$(( (blkId>>16) & layoutDirectoryMax )) - d2=$(( (blkId>>8) & layoutDirectoryMax )) - - blkDir=${dataDir}/subdir${d1}/subdir${d2} - - truncate -s ${blkSz} ${blkDir}/blk_${blkId} - ln -s ${metafile} ${blkDir}/blk_${blkId}_${blkGs}.meta - - blkIdx=$((blkIdx+1)) - - if [ $((blkIdx % 1000)) == 0 ]; then - echo "For split file '$splitFile', created $blkIdx of $lines_per_split blocks" - exitIfParentIsDead - fi - - done < "$splitFile" -} - -for splitFile in "$blkListSplitDir/blkList"*; do - read_split "$splitFile" & -done - -while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done - -echo "Finished creating $blkCnt blocks" - -rm -rf "$blkListSplitDir" \ No newline at end of file diff --git a/dynamometer-infra/src/main/resources/scripts/metafile b/dynamometer-infra/src/main/resources/scripts/metafile deleted file mode 100644 index 6f220b46b97dfdaec060d6e1bf46f8dd9c22637c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11 NcmZQzWN=`B0006N073u& diff --git a/dynamometer-infra/src/main/resources/scripts/start-component.sh b/dynamometer-infra/src/main/resources/start-component.sh similarity index 80% rename from dynamometer-infra/src/main/resources/scripts/start-component.sh rename to dynamometer-infra/src/main/resources/start-component.sh index 1c596424fd..ebdb0aece5 100755 --- a/dynamometer-infra/src/main/resources/scripts/start-component.sh +++ b/dynamometer-infra/src/main/resources/start-component.sh @@ -8,14 +8,17 @@ # USAGE: # ./start-component.sh namenode hdfs_storage # OR -# ./start-component.sh datanode nn_servicerpc_address +# ./start-component.sh datanode nn_servicerpc_address sleep_time_sec # First parameter should be component being launched, either `datanode` or `namenode` # If component is namenode, hdfs_storage is expected to point to a location to # write out shared files such as the file containing the information about # which ports the NN started on (at nn_info.prop) and the namenode's metrics # (at namenode_metrics) -# If component is datanode, nn_servicerpc_address is expected to point to -# servicerpc address of the namenode +# If component is datanode, nn_servicerpc_address is expected to point to the +# servicerpc address of the namenode. sleep_time_sec is the amount of time that +# should be allowed to elapse before launching anything. The +# `com.linkedin.dynamometer.SimulatedDataNodes` class will be used to start multiple +# DataNodes within the same JVM, and they will store their block files in memory. component="$1" if [[ "$component" != "datanode" && "$component" != "namenode" ]]; then @@ -29,11 +32,12 @@ if [ "$component" = "namenode" ]; then fi hdfsStoragePath="$2" else - if [ $# -lt 2 ]; then + if [ $# -lt 3 ]; then echo "Not enough arguments for DataNode" exit 1 fi nnServiceRpcAddress="$2" + launchDelaySec="$3" fi containerID=${CONTAINER_ID##*_} @@ -142,33 +146,49 @@ if [ "$component" = "datanode" ]; then done dataDirs=${dataDirs:1} - ./scripts/create-dn-dir.sh "$dataDirs" - if [ $? -ne 0 ]; then - echo "Unable to create datanode directories" - exit 1 - fi - - ipcPort=`find_available_port "$baseIpcPort"` - httpPort=`find_available_port "$baseHttpPort"` - serverPort=`find_available_port "$baseServerPort"` - - read -r -d '' datanodeConfigs </dev/null; then + echo "Parent process ($PPID) exited while waiting; now exiting" + exit 0 + fi + done + + versionFile="`pwd`/VERSION" + bpId=`cat "${versionFile}" | grep blockpoolID | awk -F\= '{print $2}'` + listingFiles=() + blockDir="`pwd`/blocks" + for listingFile in ${blockDir}/*; do + listingFiles+=("file://${listingFile}") + done + + localHostname=`hostname` + + read -r -d '' datanodeClusterConfigs <