Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public Boolean get() {
LOG.info("Finished requesting datanode containers");

if (launchNameNode) {
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, true, exitCritera, LOG);
}

waitForCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ public Boolean get() {
}
}
DynoInfraUtils.waitForNameNodeStartup(namenodeProperties.get(), exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, exitCritera, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), numTotalDataNodes, false,
exitCritera, LOG);
break;
} catch (IOException ioe) {
LOG.error("Unexpected exception while waiting for NameNode readiness", ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,40 @@
*/
package com.linkedin.dynamometer;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
Expand All @@ -43,11 +59,13 @@ public class DynoInfraUtils {
public static final String NAMENODE_STARTUP_PROGRESS_JMX_QUERY = "Hadoop:service=NameNode,name=StartupProgress";
public static final String FSNAMESYSTEM_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystem";
public static final String FSNAMESYSTEM_STATE_JMX_QUERY = "Hadoop:service=NameNode,name=FSNamesystemState";
public static final String NAMENODE_INFO_JMX_QUERY = "Hadoop:service=NameNode,name=NameNodeInfo";
// The JMX property names of various properties.
public static final String JMX_MISSING_BLOCKS = "MissingBlocks";
public static final String JMX_UNDER_REPLICATED_BLOCKS = "UnderReplicatedBlocks";
public static final String JMX_BLOCKS_TOTAL = "BlocksTotal";
public static final String JMX_LIVE_NODE_COUNT = "NumLiveDataNodes";
public static final String JMX_LIVE_NODES_LIST = "LiveNodes";

/**
* If a file matching {@value HADOOP_TAR_FILENAME_FORMAT} and {@code version} is
Expand Down Expand Up @@ -179,7 +197,8 @@ static void waitForNameNodeStartup(Properties nameNodeProperties, Supplier<Boole
* @param log Where to log inormation.
*/
static void waitForNameNodeReadiness(final Properties nameNodeProperties, int numTotalDataNodes,
Supplier<Boolean> shouldExit, final Log log) throws IOException, InterruptedException {
boolean triggerBlockReports, Supplier<Boolean> shouldExit, final Log log)
throws IOException, InterruptedException {
if (shouldExit.get()) {
return;
}
Expand All @@ -189,13 +208,106 @@ static void waitForNameNodeReadiness(final Properties nameNodeProperties, int nu
numTotalDataNodes*0.99, numTotalDataNodes*0.001, false, nameNodeProperties, shouldExit, log);
final int totalBlocks = Integer.parseInt(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_BLOCKS_TOTAL));
Thread blockReportThread = null;
if (triggerBlockReports) {
// This will be significantly lower than the actual expected number of blocks because it does not
// take into account replication factor. However the block reports are pretty binary; either a full
// report has been received or it hasn't. Thus we don't mind the large underestimate here.
final int blockThreshold = totalBlocks / numTotalDataNodes * 2;
// The Configuration object here is based on the host cluster, which may
// have security enabled; we need to disable it to talk to the Dyno NN
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "false");
final DistributedFileSystem dfs =
(DistributedFileSystem) FileSystem.get(getNameNodeHdfsUri(nameNodeProperties), conf);
log.info("Launching thread to trigger block reports for Datanodes with <" + blockThreshold + " blocks reported");
blockReportThread = new Thread() {
@Override
public void run() {
// Here we count both Missing and UnderReplicated within under replicated
long lastUnderRepBlocks = Long.MAX_VALUE;
try {
while (!this.isInterrupted()) {
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
long underRepBlocks = Long.parseLong(
fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS)) +
Long.parseLong(fetchNameNodeJMXValue(nameNodeProperties, FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_UNDER_REPLICATED_BLOCKS));
long blockDecrease = lastUnderRepBlocks - underRepBlocks;
lastUnderRepBlocks = underRepBlocks;
if (blockDecrease < 0 || blockDecrease > (totalBlocks * 0.001)) {
continue;
}

String liveNodeListString =
fetchNameNodeJMXValue(nameNodeProperties, NAMENODE_INFO_JMX_QUERY, JMX_LIVE_NODES_LIST);
Set<String> datanodesToReport = parseStaleDataNodeList(liveNodeListString, blockThreshold, log);
log.info(String.format("Queueing %d Datanodes for block report: %s", datanodesToReport.size(),
Joiner.on(",").join(datanodesToReport)));
DatanodeInfo[] datanodes = dfs.getDataNodeStats();
int cnt = 0;
for (DatanodeInfo datanode : datanodes) {
if (datanodesToReport.contains(datanode.getXferAddr(true))) {
if (this.isInterrupted()) {
break;
}
triggerDataNodeBlockReport(conf, datanode.getIpcAddr(true));
cnt++;
Thread.sleep(1000);
}
}
if (cnt != datanodesToReport.size()) {
log.warn(String.format(
"Found %d Datanodes to queue block reports for but was only able to trigger %d",
datanodesToReport.size(), cnt));
}
} catch (IOException ioe) {
log.warn("Exception encountered in block report thread", ioe);
}
}
} catch (InterruptedException ie) {
// Do nothing; just exit
}
log.info("Block reporting thread exiting");
}
};
blockReportThread.setUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
blockReportThread.start();
}
log.info("Waiting for MissingBlocks to fall below " + totalBlocks*0.0001 + "...");
waitForNameNodeJMXValue("Number of missing blocks", FSNAMESYSTEM_JMX_QUERY, JMX_MISSING_BLOCKS,
totalBlocks*0.0001, totalBlocks*0.0001, true, nameNodeProperties, shouldExit, log);
log.info("Waiting for UnderReplicatedBlocks to fall below " + totalBlocks*0.01 + "...");
waitForNameNodeJMXValue("Number of under replicated blocks", FSNAMESYSTEM_STATE_JMX_QUERY,
JMX_UNDER_REPLICATED_BLOCKS, totalBlocks*0.01, totalBlocks*0.001, true, nameNodeProperties, shouldExit, log);
log.info("NameNode is ready for use!");
if (blockReportThread != null) {
blockReportThread.interrupt();
log.debug("Interrupted block report thread; joining");
blockReportThread.join(5000);
if (blockReportThread.isAlive()) {
log.debug("Joined block report thread");
} else {
log.warn("Unable to join block report thread after 5s; continuing");
}
}
}

/**
* Trigger a block report on a given DataNode.
* @param conf Configuration
* @param dataNodeTarget The target; should be like <host>:<port>
*/
private static void triggerDataNodeBlockReport(Configuration conf, String dataNodeTarget) throws IOException {
InetSocketAddress datanodeAddr = NetUtils.createSocketAddr(dataNodeTarget);

ClientDatanodeProtocol dnProtocol = DFSUtil.createClientDatanodeProtocolProxy(
datanodeAddr, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));

dnProtocol.triggerBlockReport(new BlockReportOptions.Factory().build());
}

/**
Expand Down Expand Up @@ -243,6 +355,47 @@ private static void waitForNameNodeJMXValue(String valueName, String jmxBeanQuer
}
}

static Set<String> parseStaleDataNodeList(String liveNodeJsonString, final int blockThreshold,
final Log log) throws IOException {
final Set<String> dataNodesToReport = new HashSet<>();

JsonFactory fac = new JsonFactory();
JsonParser parser = fac.createJsonParser(IOUtils.toInputStream(liveNodeJsonString, StandardCharsets.UTF_8.name()));

int objectDepth = 0;
String currentNodeAddr = null;
for (JsonToken tok = parser.nextToken(); tok != null; tok = parser.nextToken()) {
if (tok == JsonToken.START_OBJECT) {
objectDepth++;
} else if (tok == JsonToken.END_OBJECT) {
objectDepth--;
} else if (tok == JsonToken.FIELD_NAME) {
if (objectDepth == 1) {
// This is where the Datanode identifiers are stored
currentNodeAddr = parser.getCurrentName();
} else if (objectDepth == 2) {
if (parser.getCurrentName().equals("numBlocks")) {
JsonToken valueToken = parser.nextToken();
if (valueToken != JsonToken.VALUE_NUMBER_INT || currentNodeAddr == null) {
throw new IOException(String.format("Malformed LiveNodes JSON; got token = %s; currentNodeAddr = %s: %s",
valueToken, currentNodeAddr, liveNodeJsonString));
}
int numBlocks = parser.getIntValue();
if (numBlocks < blockThreshold) {
log.debug(String.format("Queueing Datanode <%s> for block report; numBlocks = %d",
currentNodeAddr, numBlocks));
dataNodesToReport.add(currentNodeAddr);
} else {
log.debug(String.format("Not queueing Datanode <%s> for block report; numBlocks = %d",
currentNodeAddr, numBlocks));
}
}
}
}
}
return dataNodesToReport;
}

/**
* Fetch a value from the launched NameNode's JMX.
* @param nameNodeProperties The set of properties containing information about the NameNode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Boolean get() {
fail("Unable to fetch NameNode properties");
}

DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, falseSupplier, LOG);
DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false, falseSupplier, LOG);

// Test that we can successfully write to / read from the cluster
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer;

import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public class TestDynoInfraUtils {

private static final Log LOG = LogFactory.getLog(TestDynoInfraUtils.class);

@Test
public void testParseStaleDatanodeListSingleDatanode() throws Exception {
// Confirm all types of values can be properly parsed
String json = "{" +
"\"1.2.3.4:5\": {" +
" \"numBlocks\": 5," +
" \"fooString\":\"stringValue\"," +
" \"fooInteger\": 1," +
" \"fooFloat\": 1.0," +
" \"fooArray\": []" +
"}" +
"}";
Set<String> out = DynoInfraUtils.parseStaleDataNodeList(json, 10, LOG);
assertEquals(1, out.size());
assertTrue(out.contains("1.2.3.4:5"));
}

@Test
public void testParseStaleDatanodeListMultipleDatanodes() throws Exception {
String json = "{" +
"\"1.2.3.4:1\": {\"numBlocks\": 0}, " +
"\"1.2.3.4:2\": {\"numBlocks\": 15}, " +
"\"1.2.3.4:3\": {\"numBlocks\": 5}, " +
"\"1.2.3.4:4\": {\"numBlocks\": 10} " +
"}";
Set<String> out = DynoInfraUtils.parseStaleDataNodeList(json, 10, LOG);
assertEquals(2, out.size());
assertTrue(out.contains("1.2.3.4:1"));
assertTrue(out.contains("1.2.3.4:3"));
}

}