From 53d0a4be49e6704b068ce42adcb6c2755de549d8 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 2 Jan 2018 16:16:30 -0800 Subject: [PATCH] Closes #37. Add in the ability to trigger block reports on DataNodes that haven't reported everything yet. --- .../dynamometer/ApplicationMaster.java | 2 +- .../java/com/linkedin/dynamometer/Client.java | 3 +- .../linkedin/dynamometer/DynoInfraUtils.java | 155 +++++++++++++++++- .../dynamometer/TestDynamometerInfra.java | 2 +- .../dynamometer/TestDynoInfraUtils.java | 52 ++++++ 5 files changed, 210 insertions(+), 4 deletions(-) create mode 100644 dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynoInfraUtils.java diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java index eff89ce1bc..81cc80e12e 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java @@ -340,7 +340,7 @@ public Boolean get() { LOG.info("Finished requesting datanode containers"); if (launchNameNode) { - DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, exitCritera, LOG); + DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, LOG); } waitForCompletion(); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 3073ee9662..dd3f7a4489 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -744,7 +744,8 @@ public Boolean get() { } } DynoInfraUtils.waitForNameNodeStartup(namenodeProperties.get(), exitCritera, LOG); - DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, exitCritera, LOG); + DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, false, + exitCritera, LOG); break; } catch (IOException ioe) { LOG.error("Unexpected exception while waiting for NameNode readiness", ioe); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoInfraUtils.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoInfraUtils.java index 1218e5325b..856595824b 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoInfraUtils.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoInfraUtils.java @@ -4,6 +4,7 @@ */ package com.linkedin.dynamometer; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Supplier; import java.io.File; @@ -11,17 +12,32 @@ import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParser; @@ -43,11 +59,13 @@ public class DynoInfraUtils { public static final String NAMENODE_STARTUP_PROGRESS_JMX_QUERY = "Hadoop:service=NameNode,name=StartupProgress"; public static final String FSNAMESYSTEM_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystem"; public static final String FSNAMESYSTEM_STATE_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystemState"; + public static final String NAMENODE_INFO_JMX_QUERY = "Hadoop:service=NameNode,name=NameNodeInfo"; // The JMX property names of various properties. public static final String JMX_MISSING_BLOCKS = "MissingBlocks"; public static final String JMX_UNDER_REPLICATED_BLOCKS = "UnderReplicatedBlocks"; public static final String JMX_BLOCKS_TOTAL = "BlocksTotal"; public static final String JMX_LIVE_NODE_COUNT = "NumLiveDataNodes"; + public static final String JMX_LIVE_NODES_LIST = "LiveNodes"; /** * If a file matching {@value HADOOP_TAR_FILENAME_FORMAT} and {@code version} is @@ -179,7 +197,8 @@ static void waitForNameNodeStartup(Properties nameNodeProperties, Supplier shouldExit, final Log log) throws IOException, InterruptedException { + boolean triggerBlockReports, Supplier shouldExit, final Log log) + throws IOException, InterruptedException { if (shouldExit.get()) { return; } @@ -189,6 +208,74 @@ static void waitForNameNodeReadiness(final Properties nameNodeProperties, int nu numTotalDataNodes*0.99, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log); final int totalBlocks = Integer.parseInt(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY, JMX_BLOCKS_TOTAL)); + Thread blockReportThread = null; + if (triggerBlockReports) { + // This will be significantly lower than the actual expected number of blocks because it does not + // take into account replication factor. However the block reports are pretty binary; either a full + // report has been received or it hasn't. Thus we don't mind the large underestimate here. + final int blockThreshold = totalBlocks / numTotalDataNodes * 2; + // The Configuration object here is based on the host cluster, which may + // have security enabled; we need to disable it to talk to the Dyno NN + final Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "false"); + final DistributedFileSystem dfs = + (DistributedFileSystem) FileSystem.get(getNameNodeHdfsUri(nameNodeProperties), conf); + log.info("Launching thread to trigger block reports for Datanodes with <" + blockThreshold + " blocks reported"); + blockReportThread = new Thread() { + @Override + public void run() { + // Here we count both Missing and UnderReplicated within under replicated + long lastUnderRepBlocks = Long.MAX_VALUE; + try { + while (!this.isInterrupted()) { + try { + Thread.sleep(TimeUnit.MINUTES.toMillis(1)); + long underRepBlocks = Long.parseLong( + fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS)) + + Long.parseLong(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY, + JMX_UNDER_REPLICATED_BLOCKS)); + long blockDecrease = lastUnderRepBlocks - underRepBlocks; + lastUnderRepBlocks = underRepBlocks; + if (blockDecrease < 0 || blockDecrease > (totalBlocks * 0.001)) { + continue; + } + + String liveNodeListString = + fetchNameNodeJMXValue(nameNodeProperties, NAMENODE_INFO_JMX_QUERY, JMX_LIVE_NODES_LIST); + Set datanodesToReport = parseStaleDataNodeList(liveNodeListString, blockThreshold, log); + log.info(String.format("Queueing %d Datanodes for block report: %s", datanodesToReport.size(), + Joiner.on(",").join(datanodesToReport))); + DatanodeInfo[] datanodes = dfs.getDataNodeStats(); + int cnt = 0; + for (DatanodeInfo datanode : datanodes) { + if (datanodesToReport.contains(datanode.getXferAddr(true))) { + if (this.isInterrupted()) { + break; + } + triggerDataNodeBlockReport(conf, datanode.getIpcAddr(true)); + cnt++; + Thread.sleep(1000); + } + } + if (cnt != datanodesToReport.size()) { + log.warn(String.format( + "Found %d Datanodes to queue block reports for but was only able to trigger %d", + datanodesToReport.size(), cnt)); + } + } catch (IOException ioe) { + log.warn("Exception encountered in block report thread", ioe); + } + } + } catch (InterruptedException ie) { + // Do nothing; just exit + } + log.info("Block reporting thread exiting"); + } + }; + blockReportThread.setUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + blockReportThread.start(); + } log.info("Waiting for MissingBlocks to fall below " + totalBlocks*0.0001 + "..."); waitForNameNodeJMXValue("Number of missing blocks", FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS, totalBlocks*0.0001, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log); @@ -196,6 +283,31 @@ static void waitForNameNodeReadiness(final Properties nameNodeProperties, int nu waitForNameNodeJMXValue("Number of under replicated blocks", FSNAMESYSTEM_STATE_JMX_QUERY, JMX_UNDER_REPLICATED_BLOCKS, totalBlocks*0.01, totalBlocks*0.001, true, nameNodeProperties, shouldExit, log); log.info("NameNode is ready for use!"); + if (blockReportThread != null) { + blockReportThread.interrupt(); + log.debug("Interrupted block report thread; joining"); + blockReportThread.join(5000); + if (blockReportThread.isAlive()) { + log.debug("Joined block report thread"); + } else { + log.warn("Unable to join block report thread after 5s; continuing"); + } + } + } + + /** + * Trigger a block report on a given DataNode. + * @param conf Configuration + * @param dataNodeTarget The target; should be like : + */ + private static void triggerDataNodeBlockReport(Configuration conf, String dataNodeTarget) throws IOException { + InetSocketAddress datanodeAddr = NetUtils.createSocketAddr(dataNodeTarget); + + ClientDatanodeProtocol dnProtocol = DFSUtil.createClientDatanodeProtocolProxy( + datanodeAddr, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class)); + + dnProtocol.triggerBlockReport(new BlockReportOptions.Factory().build()); } /** @@ -243,6 +355,47 @@ private static void waitForNameNodeJMXValue(String valueName, String jmxBeanQuer } } + static Set parseStaleDataNodeList(String liveNodeJsonString, final int blockThreshold, + final Log log) throws IOException { + final Set dataNodesToReport = new HashSet<>(); + + JsonFactory fac = new JsonFactory(); + JsonParser parser = fac.createJsonParser(IOUtils.toInputStream(liveNodeJsonString, StandardCharsets.UTF_8.name())); + + int objectDepth = 0; + String currentNodeAddr = null; + for (JsonToken tok = parser.nextToken(); tok != null; tok = parser.nextToken()) { + if (tok == JsonToken.START_OBJECT) { + objectDepth++; + } else if (tok == JsonToken.END_OBJECT) { + objectDepth--; + } else if (tok == JsonToken.FIELD_NAME) { + if (objectDepth == 1) { + // This is where the Datanode identifiers are stored + currentNodeAddr = parser.getCurrentName(); + } else if (objectDepth == 2) { + if (parser.getCurrentName().equals("numBlocks")) { + JsonToken valueToken = parser.nextToken(); + if (valueToken != JsonToken.VALUE_NUMBER_INT || currentNodeAddr == null) { + throw new IOException(String.format("Malformed LiveNodes JSON; got token = %s; currentNodeAddr = %s: %s", + valueToken, currentNodeAddr, liveNodeJsonString)); + } + int numBlocks = parser.getIntValue(); + if (numBlocks < blockThreshold) { + log.debug(String.format("Queueing Datanode <%s> for block report; numBlocks = %d", + currentNodeAddr, numBlocks)); + dataNodesToReport.add(currentNodeAddr); + } else { + log.debug(String.format("Not queueing Datanode <%s> for block report; numBlocks = %d", + currentNodeAddr, numBlocks)); + } + } + } + } + } + return dataNodesToReport; + } + /** * Fetch a value from the launched NameNode's JMX. * @param nameNodeProperties The set of properties containing information about the NameNode. diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index 4ffa7860fd..db72aab2ac 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -303,7 +303,7 @@ public Boolean get() { fail("Unable to fetch NameNode properties"); } - DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, falseSupplier, LOG); + DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, LOG); // Test that we can successfully write to / read from the cluster try { diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynoInfraUtils.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynoInfraUtils.java new file mode 100644 index 0000000000..1d05d5252c --- /dev/null +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynoInfraUtils.java @@ -0,0 +1,52 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestDynoInfraUtils { + + private static final Log LOG = LogFactory.getLog(TestDynoInfraUtils.class); + + @Test + public void testParseStaleDatanodeListSingleDatanode() throws Exception { + // Confirm all types of values can be properly parsed + String json = "{" + + "\"1.2.3.4:5\": {" + + " \"numBlocks\": 5," + + " \"fooString\":\"stringValue\"," + + " \"fooInteger\": 1," + + " \"fooFloat\": 1.0," + + " \"fooArray\": []" + + "}" + + "}"; + Set out = DynoInfraUtils.parseStaleDataNodeList(json, 10, LOG); + assertEquals(1, out.size()); + assertTrue(out.contains("1.2.3.4:5")); + } + + @Test + public void testParseStaleDatanodeListMultipleDatanodes() throws Exception { + String json = "{" + + "\"1.2.3.4:1\": {\"numBlocks\": 0}, " + + "\"1.2.3.4:2\": {\"numBlocks\": 15}, " + + "\"1.2.3.4:3\": {\"numBlocks\": 5}, " + + "\"1.2.3.4:4\": {\"numBlocks\": 10} " + + "}"; + Set out = DynoInfraUtils.parseStaleDataNodeList(json, 10, LOG); + assertEquals(2, out.size()); + assertTrue(out.contains("1.2.3.4:1")); + assertTrue(out.contains("1.2.3.4:3")); + } + +} +