diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java index bd99db403d69b8..2fe3fd1a877ef1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicy.java @@ -36,15 +36,25 @@ public abstract class TrashPolicy extends Configured { protected Path trash; // path to trash directory protected long deletionInterval; // deletion interval for Emptier + /** + * Used to setup the trash policy. Must be implemented by all TrashPolicy + * implementations. + * @param conf the configuration to be used + * @param fs the filesystem to be used + * @param home the home directory + * @deprecated Use {@link #initialize(Configuration, FileSystem)} instead. + */ + @Deprecated + public abstract void initialize(Configuration conf, FileSystem fs, Path home); + /** * Used to setup the trash policy. Must be implemented by all TrashPolicy * implementations. Different from initialize(conf, fs, home), this one does * not assume trash always under /user/$USER due to HDFS encryption zone. * @param conf the configuration to be used * @param fs the filesystem to be used - * @throws IOException */ - public void initialize(Configuration conf, FileSystem fs) throws IOException{ + public void initialize(Configuration conf, FileSystem fs) { throw new UnsupportedOperationException(); } @@ -99,6 +109,25 @@ public Path getCurrentTrashDir(Path path) throws IOException { */ public abstract Runnable getEmptier() throws IOException; + /** + * Get an instance of the configured TrashPolicy based on the value + * of the configuration parameter fs.trash.classname. + * + * @param conf the configuration to be used + * @param fs the file system to be used + * @param home the home directory + * @return an instance of TrashPolicy + * @deprecated Use {@link #getInstance(Configuration, FileSystem)} instead. + */ + @Deprecated + public static TrashPolicy getInstance(Configuration conf, FileSystem fs, Path home) { + Class trashClass = conf.getClass( + "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class); + TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf); + trash.initialize(conf, fs, home); // initialize TrashPolicy + return trash; + } + /** * Get an instance of the configured TrashPolicy based on the value * of the configuration parameter fs.trash.classname. @@ -107,8 +136,7 @@ public Path getCurrentTrashDir(Path path) throws IOException { * @param fs the file system to be used * @return an instance of TrashPolicy */ - public static TrashPolicy getInstance(Configuration conf, FileSystem fs) - throws IOException { + public static TrashPolicy getInstance(Configuration conf, FileSystem fs) { Class trashClass = conf.getClass( "fs.trash.classname", TrashPolicyDefault.class, TrashPolicy.class); TrashPolicy trash = ReflectionUtils.newInstance(trashClass, conf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java index f4a825c451eab2..72222be04a8ed8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java @@ -75,6 +75,21 @@ private TrashPolicyDefault(FileSystem fs, Configuration conf) initialize(conf, fs); } + /** + * @deprecated Use {@link #initialize(Configuration, FileSystem)} instead. + */ + @Override + @Deprecated + public void initialize(Configuration conf, FileSystem fs, Path home) { + this.fs = fs; + this.deletionInterval = (long)(conf.getFloat( + FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + this.emptierInterval = (long)(conf.getFloat( + FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + } + @Override public void initialize(Configuration conf, FileSystem fs) { this.fs = fs; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java index 2a9c5d0bc2ad04..20a4cd6c0138da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java @@ -60,20 +60,22 @@ public Configuration getConf() { * Are the native snappy libraries loaded & initialized? */ public static void checkNativeCodeLoaded() { - if (!NativeCodeLoader.isNativeCodeLoaded() || - !NativeCodeLoader.buildSupportsSnappy()) { - throw new RuntimeException("native snappy library not available: " + - "this version of libhadoop was built without " + - "snappy support."); - } - if (!SnappyCompressor.isNativeCodeLoaded()) { - throw new RuntimeException("native snappy library not available: " + - "SnappyCompressor has not been loaded."); - } - if (!SnappyDecompressor.isNativeCodeLoaded()) { - throw new RuntimeException("native snappy library not available: " + - "SnappyDecompressor has not been loaded."); - } + if (!NativeCodeLoader.buildSupportsSnappy()) { + throw new RuntimeException("native snappy library not available: " + + "this version of libhadoop was built without " + + "snappy support."); + } + if (!NativeCodeLoader.isNativeCodeLoaded()) { + throw new RuntimeException("Failed to load libhadoop."); + } + if (!SnappyCompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native snappy library not available: " + + "SnappyCompressor has not been loaded."); + } + if (!SnappyDecompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native snappy library not available: " + + "SnappyDecompressor has not been loaded."); + } } public static boolean isNativeCodeLoaded() { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java index 9b4cbcf0e5063c..556613639bf1f0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.ipc.Server.Call; @@ -37,14 +38,10 @@ public ExternalCall(PrivilegedExceptionAction action) { public abstract UserGroupInformation getRemoteUser(); - public final T get() throws IOException, InterruptedException { + public final T get() throws InterruptedException, ExecutionException { waitForCompletion(); if (error != null) { - if (error instanceof IOException) { - throw (IOException)error; - } else { - throw new IOException(error); - } + throw new ExecutionException(error); } return result; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java index 4fa839f5e549c6..79eae123144933 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogLevel.java @@ -47,15 +47,17 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * Change log level in runtime. */ @InterfaceStability.Evolving public class LogLevel { - public static final String USAGES = "\nUsage: General options are:\n" + public static final String USAGES = "\nUsage: Command options are:\n" + "\t[-getlevel [-protocol (http|https)]\n" + "\t[-setlevel " + "[-protocol (http|https)]\n"; @@ -67,7 +69,7 @@ public class LogLevel { */ public static void main(String[] args) throws Exception { CLI cli = new CLI(new Configuration()); - System.exit(cli.run(args)); + System.exit(ToolRunner.run(cli, args)); } /** @@ -81,6 +83,7 @@ private enum Operations { private static void printUsage() { System.err.println(USAGES); + GenericOptionsParser.printGenericCommandUsage(System.err); } public static boolean isValidProtocol(String protocol) { @@ -107,7 +110,7 @@ public int run(String[] args) throws Exception { sendLogLevelRequest(); } catch (HadoopIllegalArgumentException e) { printUsage(); - throw e; + return -1; } return 0; } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md index f2227690e6c55b..a6f7e716c9251b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/ClusterSetup.md @@ -35,7 +35,7 @@ Installation Installing a Hadoop cluster typically involves unpacking the software on all the machines in the cluster or installing it via a packaging system as appropriate for your operating system. It is important to divide up the hardware into functions. -Typically one machine in the cluster is designated as the NameNode and another machine the as ResourceManager, exclusively. These are the masters. Other services (such as Web App Proxy Server and MapReduce Job History server) are usually run either on dedicated hardware or on shared infrastrucutre, depending upon the load. +Typically one machine in the cluster is designated as the NameNode and another machine as the ResourceManager, exclusively. These are the masters. Other services (such as Web App Proxy Server and MapReduce Job History server) are usually run either on dedicated hardware or on shared infrastrucutre, depending upon the load. The rest of the machines in the cluster act as both DataNode and NodeManager. These are the workers. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md index 4d7d5044ad5584..2ece71a3beac02 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/CommandsManual.md @@ -202,7 +202,9 @@ Manage keys via the KeyProvider. For details on KeyProviders, see the [Transpare Providers frequently require that a password or other secret is supplied. If the provider requires a password and is unable to find one, it will use a default password and emit a warning message that the default password is being used. If the `-strict` flag is supplied, the warning message becomes an error message and the command returns immediately with an error status. -NOTE: Some KeyProviders (e.g. org.apache.hadoop.crypto.key.JavaKeyStoreProvider) does not support uppercase key names. +NOTE: Some KeyProviders (e.g. org.apache.hadoop.crypto.key.JavaKeyStoreProvider) do not support uppercase key names. + +NOTE: Some KeyProviders do not directly execute a key deletion (e.g. performs a soft-delete instead, or delay the actual deletion, to prevent mistake). In these cases, one may encounter errors when creating/deleting a key with the same name after deleting it. Please check the underlying KeyProvider for details. ### `trace` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md b/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md index d7827b5f316620..05b18b59298e01 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Compatibility.md @@ -68,7 +68,7 @@ Wire compatibility concerns data being transmitted over the wire between Hadoop #### Use Cases * Client-Server compatibility is required to allow users to continue using the old clients even after upgrading the server (cluster) to a later version (or vice versa). For example, a Hadoop 2.1.0 client talking to a Hadoop 2.3.0 cluster. -* Client-Server compatibility is also required to allow users to upgrade the client before upgrading the server (cluster). For example, a Hadoop 2.4.0 client talking to a Hadoop 2.3.0 cluster. This allows deployment of client-side bug fixes ahead of full cluster upgrades. Note that new cluster features invoked by new client APIs or shell commands will not be usable. YARN applications that attempt to use new APIs (including new fields in data structures) that have not yet deployed to the cluster can expect link exceptions. +* Client-Server compatibility is also required to allow users to upgrade the client before upgrading the server (cluster). For example, a Hadoop 2.4.0 client talking to a Hadoop 2.3.0 cluster. This allows deployment of client-side bug fixes ahead of full cluster upgrades. Note that new cluster features invoked by new client APIs or shell commands will not be usable. YARN applications that attempt to use new APIs (including new fields in data structures) that have not yet been deployed to the cluster can expect link exceptions. * Client-Server compatibility is also required to allow upgrading individual components without upgrading others. For example, upgrade HDFS from version 2.1.0 to 2.2.0 without upgrading MapReduce. * Server-Server compatibility is required to allow mixed versions within an active cluster so the cluster may be upgraded without downtime in a rolling fashion. @@ -76,7 +76,7 @@ Wire compatibility concerns data being transmitted over the wire between Hadoop * Both Client-Server and Server-Server compatibility is preserved within a major release. (Different policies for different categories are yet to be considered.) * Compatibility can be broken only at a major release, though breaking compatibility even at major releases has grave consequences and should be discussed in the Hadoop community. -* Hadoop protocols are defined in .proto (ProtocolBuffers) files. Client-Server protocols and Server-protocol .proto files are marked as stable. When a .proto file is marked as stable it means that changes should be made in a compatible fashion as described below: +* Hadoop protocols are defined in .proto (ProtocolBuffers) files. Client-Server protocols and Server-Server protocol .proto files are marked as stable. When a .proto file is marked as stable it means that changes should be made in a compatible fashion as described below: * The following changes are compatible and are allowed at any time: * Add an optional field, with the expectation that the code deals with the field missing due to communication with an older version of the code. * Add a new rpc/method to the service @@ -101,7 +101,7 @@ Wire compatibility concerns data being transmitted over the wire between Hadoop ### Java Binary compatibility for end-user applications i.e. Apache Hadoop ABI -As Apache Hadoop revisions are upgraded end-users reasonably expect that their applications should continue to work without any modifications. This is fulfilled as a result of support API compatibility, Semantic compatibility and Wire compatibility. +As Apache Hadoop revisions are upgraded end-users reasonably expect that their applications should continue to work without any modifications. This is fulfilled as a result of supporting API compatibility, Semantic compatibility and Wire compatibility. However, Apache Hadoop is a very complex, distributed system and services a very wide variety of use-cases. In particular, Apache Hadoop MapReduce is a very, very wide API; in the sense that end-users may make wide-ranging assumptions such as layout of the local disk when their map/reduce tasks are executing, environment variables for their tasks etc. In such cases, it becomes very hard to fully specify, and support, absolute compatibility. @@ -115,12 +115,12 @@ However, Apache Hadoop is a very complex, distributed system and services a very * Existing MapReduce, YARN & HDFS applications and frameworks should work unmodified within a major release i.e. Apache Hadoop ABI is supported. * A very minor fraction of applications maybe affected by changes to disk layouts etc., the developer community will strive to minimize these changes and will not make them within a minor version. In more egregious cases, we will consider strongly reverting these breaking changes and invalidating offending releases if necessary. -* In particular for MapReduce applications, the developer community will try our best to support provide binary compatibility across major releases e.g. applications using org.apache.hadoop.mapred. +* In particular for MapReduce applications, the developer community will try our best to support providing binary compatibility across major releases e.g. applications using org.apache.hadoop.mapred. * APIs are supported compatibly across hadoop-1.x and hadoop-2.x. See [Compatibility for MapReduce applications between hadoop-1.x and hadoop-2.x](../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html) for more details. ### REST APIs -REST API compatibility corresponds to both the request (URLs) and responses to each request (content, which may contain other URLs). Hadoop REST APIs are specifically meant for stable use by clients across releases, even major releases. The following are the exposed REST APIs: +REST API compatibility corresponds to both the requests (URLs) and responses to each request (content, which may contain other URLs). Hadoop REST APIs are specifically meant for stable use by clients across releases, even major ones. The following are the exposed REST APIs: * [WebHDFS](../hadoop-hdfs/WebHDFS.html) - Stable * [ResourceManager](../../hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html) @@ -136,7 +136,7 @@ The APIs annotated stable in the text above preserve compatibility across at lea ### Metrics/JMX -While the Metrics API compatibility is governed by Java API compatibility, the actual metrics exposed by Hadoop need to be compatible for users to be able to automate using them (scripts etc.). Adding additional metrics is compatible. Modifying (eg changing the unit or measurement) or removing existing metrics breaks compatibility. Similarly, changes to JMX MBean object names also break compatibility. +While the Metrics API compatibility is governed by Java API compatibility, the actual metrics exposed by Hadoop need to be compatible for users to be able to automate using them (scripts etc.). Adding additional metrics is compatible. Modifying (e.g. changing the unit or measurement) or removing existing metrics breaks compatibility. Similarly, changes to JMX MBean object names also break compatibility. #### Policy @@ -148,7 +148,7 @@ User and system level data (including metadata) is stored in files of different #### User-level file formats -Changes to formats that end-users use to store their data can prevent them for accessing the data in later releases, and hence it is highly important to keep those file-formats compatible. One can always add a "new" format improving upon an existing format. Examples of these formats include har, war, SequenceFileFormat etc. +Changes to formats that end-users use to store their data can prevent them from accessing the data in later releases, and hence it is highly important to keep those file-formats compatible. One can always add a "new" format improving upon an existing format. Examples of these formats include har, war, SequenceFileFormat etc. ##### Policy @@ -185,7 +185,7 @@ Depending on the degree of incompatibility in the changes, the following potenti ### Command Line Interface (CLI) -The Hadoop command line programs may be use either directly via the system shell or via shell scripts. Changing the path of a command, removing or renaming command line options, the order of arguments, or the command return code and output break compatibility and may adversely affect users. +The Hadoop command line programs may be used either directly via the system shell or via shell scripts. Changing the path of a command, removing or renaming command line options, the order of arguments, or the command return code and output break compatibility and may adversely affect users. #### Policy diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/InterfaceClassification.md b/hadoop-common-project/hadoop-common/src/site/markdown/InterfaceClassification.md index 07abdac40b7a41..b5d6ce0c4ba0ae 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/InterfaceClassification.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/InterfaceClassification.md @@ -44,15 +44,15 @@ Interfaces have two main attributes: Audience and Stability Audience denotes the potential consumers of the interface. While many interfaces are internal/private to the implementation, other are public/external interfaces -are meant for wider consumption by applications and/or clients. For example, in +that are meant for wider consumption by applications and/or clients. For example, in posix, libc is an external or public interface, while large parts of the kernel are internal or private interfaces. Also, some interfaces are targeted towards other specific subsystems. -Identifying the audience of an interface helps define the impact of breaking +Identifying the audience of an interface helps defining the impact of breaking it. For instance, it might be okay to break the compatibility of an interface whose audience is a small number of specific subsystems. On the other hand, it -is probably not okay to break a protocol interfaces that millions of Internet +is probably not okay to break a protocol interface that millions of Internet users depend on. Hadoop uses the following kinds of audience in order of increasing/wider visibility: @@ -75,14 +75,14 @@ referred to as project-private). The interface is used by a specified set of projects or systems (typically closely related projects). Other projects or systems should not use the -interface. Changes to the interface will be communicated/ negotiated with the +interface. Changes to the interface will be communicated/negotiated with the specified projects. For example, in the Hadoop project, some interfaces are LimitedPrivate{HDFS, MapReduce} in that they are private to the HDFS and MapReduce projects. #### Public -The interface is for general use by any application. +The interface is for general use by any applications. ### Stability @@ -92,16 +92,16 @@ the interface are allowed. Hadoop APIs have the following levels of stability. #### Stable Can evolve while retaining compatibility for minor release boundaries; in other -words, incompatible changes to APIs marked Stable are allowed only at major +words, incompatible changes to APIs marked as Stable are allowed only at major releases (i.e. at m.0). #### Evolving -Evolving, but incompatible changes are allowed at minor release (i.e. m .x) +Evolving, but incompatible changes are allowed at minor releases (i.e. m .x) #### Unstable -Incompatible changes to Unstable APIs are allowed any time. This usually makes +Incompatible changes to Unstable APIs are allowed at any time. This usually makes sense for only private interfaces. However one may call this out for a supposedly public interface to highlight @@ -109,11 +109,11 @@ that it should not be used as an interface; for public interfaces, labeling it as Not-an-interface is probably more appropriate than "Unstable". Examples of publicly visible interfaces that are unstable -(i.e. not-an-interface): GUI, CLIs whose output format will change +(i.e. not-an-interface): GUI, CLIs whose output format will change. #### Deprecated -APIs that could potentially removed in the future and should not be used. +APIs that could potentially be removed in the future and should not be used. How are the Classifications Recorded? ------------------------------------- @@ -153,15 +153,15 @@ FAQ stable they capture internal properties of the system and can communicate these properties to its internal users and to developers of the interface. * e.g. In HDFS, NN-DN protocol is private but stable and can help - implement rolling upgrades. It communicates that this interface should + implementing rolling upgrades. It communicates that this interface should not be changed in incompatible ways even though it is private. - * e.g. In HDFS, FSImage stability can help provide more flexible roll backs. + * e.g. In HDFS, FSImage stability can help providing more flexible roll backs. * What is the harm in applications using a private interface that is stable? How is it different than a public stable interface? * While a private interface marked as stable is targeted to change only at major releases, it may break at other times if the providers of that - interface are willing to changes the internal users of that + interface are willing to change the internal users of that interface. Further, a public stable interface is less likely to break even at major releases (even though it is allowed to break compatibility) because the impact of the change is larger. If you use a private interface @@ -185,7 +185,7 @@ FAQ * A good example of a limited-private interface is BlockLocations, This is fairly low-level interface that we are willing to expose to MR and perhaps HBase. We are likely to change it down the road and at that time we will - have get a coordinated effort with the MR team to release matching + have got a coordinated effort with the MR team to release matching releases. While MR and HDFS are always released in sync today, they may change down the road. * If you have a limited-private interface with many projects listed then you @@ -207,7 +207,7 @@ FAQ break it at minor releases. * One example of a public interface that is unstable is where one is providing an implementation of a standards-body based interface that is - still under development. For example, many companies, in an attampt to be + still under development. For example, many companies, in an attempt to be first to market, have provided implementations of a new NFS protocol even when the protocol was not fully completed by IETF. The implementor cannot evolve the interface in a fashion that causes least distruption because diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 2c9dd5d29dacbd..cc936b4b704334 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -35,7 +35,7 @@ of the client. **Implementation Note**: the static `FileSystem get(URI uri, Configuration conf) ` method MAY return a pre-existing instance of a filesystem client class—a class that may also be in use in other threads. -The implementations of `FileSystem` which ship with Apache Hadoop +The implementations of `FileSystem` shipped with Apache Hadoop *do not make any attempt to synchronize access to the working directory field*. ## Invariants @@ -105,8 +105,6 @@ may differ from the local user account name. of the caller.** -#### Preconditions - #### Postconditions @@ -214,7 +212,6 @@ response, then, if a listing `listStatus("/d")` takes place concurrently with th [a, part-0000001, ... , part-9999999] [part-0000001, ... , part-9999999, z] - [a, part-0000001, ... , part-9999999, z] [part-0000001, ... , part-9999999] @@ -282,7 +279,7 @@ value is an instance of the `LocatedFileStatus` subclass of a `FileStatus`, and that rather than return an entire list, an iterator is returned. This is actually a `protected` method, directly invoked by -`listLocatedStatus(Path path):`. Calls to it may be delegated through +`listLocatedStatus(Path path)`. Calls to it may be delegated through layered filesystems, such as `FilterFileSystem`, so its implementation MUST be considered mandatory, even if `listLocatedStatus(Path path)` has been implemented in a different manner. There are open JIRAs proposing @@ -442,10 +439,9 @@ the convention is generally retained. ### `long getDefaultBlockSize()` -Get the "default" block size for a filesystem. This often used during +Get the "default" block size for a filesystem. This is often used during split calculations to divide work optimally across a set of worker processes. -#### Preconditions #### Postconditions @@ -466,8 +462,6 @@ A FileSystem MAY make this user-configurable (the S3 and Swift filesystem client Get the "default" block size for a path —that is, the block size to be used when writing objects to a path in the filesystem. -#### Preconditions - #### Postconditions @@ -604,7 +598,7 @@ This MAY be a bug, as it allows >1 client to create a file with `overwrite==fals and potentially confuse file/directory logic * The Local FileSystem raises a `FileNotFoundException` when trying to create a file over -a directory, hence it is is listed as an exception that MAY be raised when +a directory, hence it is listed as an exception that MAY be raised when this precondition fails. * Not covered: symlinks. The resolved path of the symlink is used as the final path argument to the `create()` operation @@ -898,8 +892,8 @@ Renaming a file where the destination is a directory moves the file as a child ##### Renaming a directory onto a directory If `src` is a directory then all its children will then exist under `dest`, while the path -`src` and its descendants will no longer not exist. The names of the paths under -`dest` will match those under `src`, as will the contents: +`src` and its descendants will no longer exist. The names of the paths under +`dest` will match those under `src`, as will the contents do: if isDir(FS, src) isDir(FS, dest) and src != dest : FS' where: @@ -928,7 +922,7 @@ The outcome is no change to FileSystem state, with a return value of false. *Local Filesystem, S3N* The outcome is as a normal rename, with the additional (implicit) feature -that the parent directores of the destination also exist +that the parent directores of the destination also exist. exists(FS', parent(dest)) @@ -1018,9 +1012,9 @@ HDFS: All source files except the final one MUST be a complete block: HDFS's restrictions may be an implementation detail of how it implements -`concat` -by changing the inode references to join them together in +`concat` by changing the inode references to join them together in a sequence. As no other filesystem in the Hadoop core codebase -implements this method, there is no way to distinguish implementation detail. +implements this method, there is no way to distinguish implementation detail from specification. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index adecd57023362d..eb5128f6d290b4 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -29,7 +29,7 @@ with extensions that add key assumptions to the system. 1. The stream being read references a finite array of bytes. 1. The length of the data does not change during the read process. 1. The contents of the data does not change during the process. -1. The source file remains present during the read process +1. The source file remains present during the read process. 1. Callers may use `Seekable.seek()` to offsets within the array of bytes, with future reads starting at this offset. 1. The cost of forward and backward seeks is low. @@ -104,7 +104,7 @@ Return the current position. The outcome when a stream is closed is undefined. Return the data at the current position. -1. Implementations should fail when a stream is closed +1. Implementations should fail when a stream is closed. 1. There is no limit on how long `read()` may take to complete. #### Preconditions @@ -124,7 +124,7 @@ Return the data at the current position. Read `length` bytes of data into the destination buffer, starting at offset `offset`. The source of the data is the current position of the stream, -as implicitly set in `pos` +as implicitly set in `pos`. #### Preconditions @@ -166,7 +166,7 @@ the stream. That is, rather than `l` being simply defined as `min(length, len(data)-length)`, it strictly is an integer in the range `1..min(length, len(data)-length)`. -While the caller may expect for as much as the buffer as possible to be filled +While the caller may expect for as much buffer as possible to be filled in, it is within the specification for an implementation to always return a smaller number, perhaps only ever 1 byte. @@ -192,7 +192,7 @@ Some filesystems do not perform this check, relying on the `read()` contract to reject reads on a closed stream (e.g. `RawLocalFileSystem`). A `seek(0)` MUST always succeed, as the seek position must be -positive and less than the length of the Stream's: +positive and less than the length of the Stream: s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException] @@ -222,7 +222,7 @@ data at offset `offset`. #### Preconditions -Not all subclasses implement the operation operation, and instead +Not all subclasses implement this operation, and instead either raise an exception or return `False`. supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException] @@ -250,7 +250,7 @@ If the operation is supported and there is a new location for the data: The new data is the original data (or an updated version of it, as covered in the Consistency section below), but the block containing the data at `offset` -sourced from a different replica. +is sourced from a different replica. If there is no other copy, `FSDIS` is not updated; the response indicates this: @@ -258,7 +258,7 @@ If there is no other copy, `FSDIS` is not updated; the response indicates this: Outside of test methods, the primary use of this method is in the {{FSInputChecker}} class, which can react to a checksum error in a read by attempting to source -the data elsewhere. It a new source can be found it attempts to reread and +the data elsewhere. If a new source can be found it attempts to reread and recheck that portion of the file. ## interface `PositionedReadable` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/introduction.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/introduction.md index 22b39d4afa6b21..eece512c1a0918 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/introduction.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/introduction.md @@ -141,7 +141,7 @@ The failure modes when a user lacks security permissions are not specified. ### Networking Assumptions -This document assumes this all network operations succeed. All statements +This document assumes that all network operations succeed. All statements can be assumed to be qualified as *"assuming the operation does not fail due to a network availability problem"* @@ -303,7 +303,7 @@ does not hold on blob stores] 1. Directory list operations are fast for directories with few entries, but may incur a cost that is `O(entries)`. Hadoop 2 added iterative listing to handle the challenge of listing directories with millions of entries without -buffering -at the cost of consistency. +buffering at the cost of consistency. 1. A `close()` of an `OutputStream` is fast, irrespective of whether or not the file operation has succeeded or not. @@ -317,7 +317,7 @@ This specification refers to *Object Stores* in places, often using the term *Blobstore*. Hadoop does provide FileSystem client classes for some of these even though they violate many of the requirements. This is why, although Hadoop can read and write data in an object store, the two which Hadoop ships -with direct support for —Amazon S3 and OpenStack Swift&mdash cannot +with direct support for — Amazon S3 and OpenStack Swift — cannot be used as direct replacement for HDFS. *What is an Object Store?* @@ -358,10 +358,10 @@ are current with respect to the files within that directory. as are `delete()` operations. Object store FileSystem clients implement these as operations on the individual objects whose names match the directory prefix. As a result, the changes take place a file at a time, and are not atomic. If -an operation fails part way through the process, the the state of the object store +an operation fails part way through the process, then the state of the object store reflects the partially completed operation. Note also that client code assumes that these operations are `O(1)` —in an object store they are -more likely to be be `O(child-entries)`. +more likely to be `O(child-entries)`. 1. **Durability**. Hadoop assumes that `OutputStream` implementations write data to their (persistent) storage on a `flush()` operation. Object store implementations diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/model.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/model.md index e04a640b656c34..e121c92deeddc4 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/model.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/model.md @@ -18,7 +18,7 @@ ## Paths and Path Elements -A Path is a list of Path elements which represents a path to a file, directory of symbolic link +A Path is a list of Path elements which represents a path to a file, directory or symbolic link Path elements are non-empty strings. The exact set of valid strings MAY be specific to a particular FileSystem implementation. @@ -179,7 +179,7 @@ path begins with the path P -that is their parent is P or an ancestor is P ### File references -A path MAY refer to a file; that it it has data in the filesystem; its path is a key in the data dictionary +A path MAY refer to a file that has data in the filesystem; its path is a key in the data dictionary def isFile(FS, p) = p in FS.Files @@ -206,7 +206,8 @@ process working with the filesystem: The function `getHomeDirectory` returns the home directory for the Filesystem and the current user account. For some FileSystems, the path is `["/","users", System.getProperty("user-name")]`. However, -for HDFS, +for HDFS, the username is derived from the credentials used to authenticate the client with HDFS. +This may differ from the local user account name. ### Exclusivity diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/notation.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/notation.md index aa310f80662747..472bb5dd7ddb5a 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/notation.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/notation.md @@ -130,7 +130,7 @@ Strings are lists of characters represented in double quotes. e.g. `"abc"` All system state declarations are immutable. -The suffix "'" (single quote) is used as the convention to indicate the state of the system after a operation: +The suffix "'" (single quote) is used as the convention to indicate the state of the system after an operation: L' = L + ['d','e'] diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/testing.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/testing.md index 6619332fc0aaf5..6823e0c6a05a58 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/testing.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/testing.md @@ -28,7 +28,7 @@ remote server providing the filesystem. These filesystem bindings must be defined in an XML configuration file, usually `hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml`. -This file is excluded should not be checked in. +This file is excluded and should not be checked in. ### ftp:// @@ -122,7 +122,7 @@ new contract class, then creating a new non-abstract test class for every test suite that you wish to test. 1. Do not try and add these tests into Hadoop itself. They won't be added to -the soutce tree. The tests must live with your own filesystem source. +the source tree. The tests must live with your own filesystem source. 1. Create a package in your own test source tree (usually) under `contract`, for the files and tests. 1. Subclass `AbstractFSContract` for your own contract implementation. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java index 2aba01f8972f29..338aff6e8d4544 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestTrash.java @@ -691,6 +691,10 @@ public static void main(String [] arg) throws IOException{ public static class TestTrashPolicy extends TrashPolicy { public TestTrashPolicy() { } + @Override + public void initialize(Configuration conf, FileSystem fs, Path home) { + } + @Override public void initialize(Configuration conf, FileSystem fs) { } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 92d91839a10541..72b603aa29bd4e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -72,6 +72,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -989,8 +990,9 @@ public Void run() throws Exception { try { exceptionCall.get(); fail("didn't throw"); - } catch (IOException ioe) { - assertEquals(expectedIOE.getMessage(), ioe.getMessage()); + } catch (ExecutionException ee) { + assertTrue((ee.getCause()) instanceof IOException); + assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage()); } } finally { server.stop(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index 46948f96b1509b..001bc92b54578b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -48,6 +48,9 @@ import java.util.Map; import java.util.Map.Entry; +import static org.apache.hadoop.hdfs.DFSConfigKeys.HTTPFS_BUFFER_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.HTTP_BUFFER_SIZE_DEFAULT; + /** * FileSystem operation executors used by {@link HttpFSServer}. */ @@ -462,7 +465,8 @@ public Void execute(FileSystem fs) throws IOException { blockSize = fs.getDefaultBlockSize(path); } FsPermission fsPermission = new FsPermission(permission); - int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096); + int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY, + HTTP_BUFFER_SIZE_DEFAULT); OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null); IOUtils.copyBytes(is, os, bufferSize, true); os.close(); @@ -752,7 +756,8 @@ public FSOpen(String path) { */ @Override public InputStream execute(FileSystem fs) throws IOException { - int bufferSize = HttpFSServerWebApp.get().getConfig().getInt("httpfs.buffer.size", 4096); + int bufferSize = HttpFSServerWebApp.get().getConfig().getInt( + HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT); return fs.open(path, bufferSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java index 0b767bec6d00a3..61d3b4505fe300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/service/hadoop/FileSystemAccessService.java @@ -50,6 +50,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + @InterfaceAudience.Private public class FileSystemAccessService extends BaseService implements FileSystemAccess { private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class); @@ -159,7 +161,7 @@ protected void init() throws ServiceException { throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL); } Configuration conf = new Configuration(); - conf.set("hadoop.security.authentication", "kerberos"); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(principal, keytab); @@ -169,7 +171,7 @@ protected void init() throws ServiceException { LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab); } else if (security.equals("simple")) { Configuration conf = new Configuration(); - conf.set("hadoop.security.authentication", "simple"); + conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple"); UserGroupInformation.setConfiguration(conf); LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name")); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h index c856928c1785aa..83c1c5902a9c61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h @@ -493,6 +493,7 @@ extern "C" { * complete before proceeding with further file updates. * -1 on error. */ + LIBHDFS_EXTERNAL int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength); /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index df45e2a6346146..10c0ad694ad62c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -70,6 +70,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.webhdfs.ugi.expire.after.access"; public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT = 10*60*1000; //10 minutes + public static final String DFS_WEBHDFS_USE_IPC_CALLQ = + "dfs.webhdfs.use.ipc.callq"; + public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true; // HA related configuration public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; @@ -992,6 +995,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.disk.balancer.plan.threshold.percent"; public static final int DFS_DISK_BALANCER_PLAN_THRESHOLD_DEFAULT = 10; + public static final String HTTPFS_BUFFER_SIZE_KEY = + "httpfs.buffer.size"; + public static final int HTTP_BUFFER_SIZE_DEFAULT = 4096; // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2471dc818cbbbf..8c591861fca585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -89,6 +89,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; +import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; @@ -241,7 +242,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; -import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -337,7 +337,7 @@ private void logAuditEvent(boolean succeeded, String cmd, String src) private void logAuditEvent(boolean succeeded, String cmd, String src, String dst, HdfsFileStatus stat) throws IOException { if (isAuditEnabled() && isExternalInvocation()) { - logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(), + logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(), cmd, src, dst, stat); } } @@ -989,6 +989,11 @@ private List initAuditLoggers(Configuration conf) { // Add audit logger to calculate top users if (topConf.isEnabled) { topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs); + if (DefaultMetricsSystem.instance().getSource( + TOPMETRICS_METRICS_SOURCE_NAME) == null) { + DefaultMetricsSystem.instance().register(TOPMETRICS_METRICS_SOURCE_NAME, + "Top N operations by user", topMetrics); + } auditLoggers.add(new TopAuditLogger(topMetrics)); } @@ -5256,17 +5261,9 @@ private AuthenticationMethod getConnectionAuthenticationMethod() * RPC call context even if the client exits. */ boolean isExternalInvocation() { - return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation(); + return Server.isRpcInvocation(); } - private static InetAddress getRemoteIp() { - InetAddress ip = Server.getRemoteIp(); - if (ip != null) { - return ip; - } - return NamenodeWebHdfsMethods.getRemoteIp(); - } - // optimize ugi lookup for RPC operations to avoid a trip through // UGI.getCurrentUser which is synch'ed private static UserGroupInformation getRemoteUser() throws IOException { @@ -6912,7 +6909,7 @@ public void logAuditEvent(boolean succeeded, String userName, sb.append(trackingId); } sb.append("\t").append("proto="); - sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc"); + sb.append(Server.getProtocol()); if (isCallerContextEnabled && callerContext != null && callerContext.isContextValid()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index ae7a9371240ef1..afedbb9839a8df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.ipc.ExternalCall; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -407,7 +409,15 @@ public FSNamesystem getNamesystem() { public NamenodeProtocols getRpcServer() { return rpcServer; } - + + public void queueExternalCall(ExternalCall extCall) + throws IOException, InterruptedException { + if (rpcServer == null) { + throw new RetriableException("Namenode is in startup mode"); + } + rpcServer.getClientRpcServer().queueCall(extCall); + } + public static void initMetrics(Configuration conf, NamenodeRole role) { metrics = NameNodeMetrics.create(conf, role); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 57f7cb197b67bb..a97a307aefd412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -139,7 +139,6 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -1686,10 +1685,7 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg) } private static String getClientMachine() { - String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress(); - if (clientMachine == null) { //not a web client - clientMachine = Server.getRemoteAddress(); - } + String clientMachine = Server.getRemoteAddress(); if (clientMachine == null) { //not a RPC client clientMachine = ""; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java index ab553928862788..2719c8857ee006 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java @@ -17,24 +17,32 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.metrics; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow; /** @@ -58,8 +66,11 @@ * Thread-safe: relies on thread-safety of RollingWindowManager */ @InterfaceAudience.Private -public class TopMetrics { +public class TopMetrics implements MetricsSource { public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class); + public static final String TOPMETRICS_METRICS_SOURCE_NAME = + "NNTopUserOpCounts"; + private final boolean isMetricsSourceEnabled; private static void logConf(Configuration conf) { LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY + @@ -83,6 +94,8 @@ public TopMetrics(Configuration conf, int[] reportingPeriods) { rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager( conf, reportingPeriods[i])); } + isMetricsSourceEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, + DFSConfigKeys.NNTOP_ENABLED_DEFAULT); } /** @@ -128,4 +141,44 @@ public void report(long currTime, String userName, String cmd) { TopConf.ALL_CMDS, userName, 1); } } + + /** + * Flatten out the top window metrics into + * {@link org.apache.hadoop.metrics2.MetricsRecord}s for consumption by + * external metrics systems. Each metrics record added corresponds to the + * reporting period a.k.a window length of the configured rolling windows. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + if (!isMetricsSourceEnabled) { + return; + } + + for (final TopWindow window : getTopWindows()) { + MetricsRecordBuilder rb = collector.addRecord(buildOpRecordName(window)) + .setContext("dfs"); + for (final Op op: window.getOps()) { + rb.addCounter(buildOpTotalCountMetricsInfo(op), op.getTotalCount()); + for (User user : op.getTopUsers()) { + rb.addCounter(buildOpRecordMetricsInfo(op, user), user.getCount()); + } + } + } + } + + private String buildOpRecordName(TopWindow window) { + return TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=" + + window.getWindowLenMs(); + } + + private MetricsInfo buildOpTotalCountMetricsInfo(Op op) { + return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType()) + + ".TotalCount", "Total operation count"); + } + + private MetricsInfo buildOpRecordMetricsInfo(Op op, User user) { + return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType()) + + ".user=" + user.getUser() + + ".count", "Total operations performed by user"); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 3ab0c676e85bcc..4887e356fec798 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -25,10 +25,13 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.security.Principal; import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutionException; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -60,6 +63,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -81,8 +85,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ExternalCall; import org.apache.hadoop.ipc.RetriableException; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.Credentials; @@ -103,39 +107,39 @@ public class NamenodeWebHdfsMethods { public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class); private static final UriFsPathParam ROOT = new UriFsPathParam(""); - - private static final ThreadLocal REMOTE_ADDRESS = new ThreadLocal(); - - /** @return the remote client address. */ - public static String getRemoteAddress() { - return REMOTE_ADDRESS.get(); - } - - public static InetAddress getRemoteIp() { - try { - return InetAddress.getByName(getRemoteAddress()); - } catch (Exception e) { - return null; - } - } - /** - * Returns true if a WebHdfs request is in progress. Akin to - * {@link Server#isRpcInvocation()}. - */ - public static boolean isWebHdfsInvocation() { - return getRemoteAddress() != null; - } + private volatile Boolean useIpcCallq; + private String scheme; + private Principal userPrincipal; + private String remoteAddr; private @Context ServletContext context; - private @Context HttpServletRequest request; private @Context HttpServletResponse response; + public NamenodeWebHdfsMethods(@Context HttpServletRequest request) { + // the request object is a proxy to thread-locals so we have to extract + // what we want from it since the external call will be processed in a + // different thread. + scheme = request.getScheme(); + userPrincipal = request.getUserPrincipal(); + // get the remote address, if coming in via a trusted proxy server then + // the address with be that of the proxied client + remoteAddr = JspHelper.getRemoteAddr(request); + } + private void init(final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final UriFsPathParam path, final HttpOpParam op, final Param... parameters) { + if (useIpcCallq == null) { + Configuration conf = + (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); + useIpcCallq = conf.getBoolean( + DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ, + DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT); + } + if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + ", " + username + ", " + doAsUser @@ -144,16 +148,8 @@ private void init(final UserGroupInformation ugi, //clear content type response.setContentType(null); - - // set the remote address, if coming in via a trust proxy server then - // the address with be that of the proxied client - REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request)); } - private void reset() { - REMOTE_ADDRESS.set(null); - } - private static NamenodeProtocols getRPCServer(NameNode namenode) throws IOException { final NamenodeProtocols np = namenode.getRpcServer(); @@ -162,11 +158,63 @@ private static NamenodeProtocols getRPCServer(NameNode namenode) } return np; } - + + private T doAs(final UserGroupInformation ugi, + final PrivilegedExceptionAction action) + throws IOException, InterruptedException { + return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action); + } + + private T doAsExternalCall(final UserGroupInformation ugi, + final PrivilegedExceptionAction action) + throws IOException, InterruptedException { + // set the remote address, if coming in via a trust proxy server then + // the address with be that of the proxied client + ExternalCall call = new ExternalCall(action){ + @Override + public UserGroupInformation getRemoteUser() { + return ugi; + } + @Override + public String getProtocol() { + return "webhdfs"; + } + @Override + public String getHostAddress() { + return remoteAddr; + } + @Override + public InetAddress getHostInetAddress() { + try { + return InetAddress.getByName(getHostAddress()); + } catch (UnknownHostException e) { + return null; + } + } + }; + final NameNode namenode = (NameNode)context.getAttribute("name.node"); + namenode.queueExternalCall(call); + T result = null; + try { + result = call.get(); + } catch (ExecutionException ee) { + Throwable t = ee.getCause(); + if (t instanceof RuntimeException) { + throw (RuntimeException)t; + } else if (t instanceof IOException) { + throw (IOException)t; + } else { + throw new IOException(t); + } + } + return result; + } + @VisibleForTesting static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize, final String excludeDatanodes) throws IOException { + final long blocksize, final String excludeDatanodes, + final String remoteAddr) throws IOException { FSNamesystem fsn = namenode.getNamesystem(); if (fsn == null) { throw new IOException("Namesystem has not been intialized yet."); @@ -190,7 +238,7 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, if (op == PutOpParam.Op.CREATE) { //choose a datanode near to client final DatanodeDescriptor clientNode = bm.getDatanodeManager( - ).getDatanodeByHost(getRemoteAddress()); + ).getDatanodeByHost(remoteAddr); if (clientNode != null) { final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS( path, clientNode, excludes, blocksize); @@ -253,7 +301,8 @@ private Token generateDelegationToken( return null; } final Token t = c.getAllTokens().iterator().next(); - Text kind = request.getScheme().equals("http") ? WebHdfsConstants.WEBHDFS_TOKEN_KIND + Text kind = scheme.equals("http") + ? WebHdfsConstants.WEBHDFS_TOKEN_KIND : WebHdfsConstants.SWEBHDFS_TOKEN_KIND; t.setKind(kind); return t; @@ -267,7 +316,7 @@ private URI redirectURI(final NameNode namenode, final Param... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; dn = chooseDatanode(namenode, path, op, openOffset, blocksize, - excludeDatanodes); + excludeDatanodes, remoteAddr); if (dn == null) { throw new IOException("Failed to find datanode, suggest to check cluster" + " health. excludeDatanodes=" + excludeDatanodes); @@ -283,7 +332,7 @@ private URI redirectURI(final NameNode namenode, } else { //generate a token final Token t = generateDelegationToken( - namenode, ugi, request.getUserPrincipal().getName()); + namenode, ugi, userPrincipal.getName()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery @@ -291,7 +340,6 @@ private URI redirectURI(final NameNode namenode, + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; - final String scheme = request.getScheme(); int port = "http".equals(scheme) ? dn.getInfoPort() : dn .getInfoSecurePort(); final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath, @@ -446,10 +494,9 @@ public Response put( xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, createFlagParam, noredirect); - return ugi.doAs(new PrivilegedExceptionAction() { + return doAs(ugi, new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { - try { return put(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, destination, owner, group, permission, unmaskedPermission, overwrite, bufferSize, @@ -458,9 +505,6 @@ public Response run() throws IOException, URISyntaxException { aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, createFlagParam, noredirect); - } finally { - reset(); - } } }); } @@ -703,16 +747,12 @@ public Response post( init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize, excludeDatanodes, newLength); - return ugi.doAs(new PrivilegedExceptionAction() { + return doAs(ugi, new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { - try { return post(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, concatSrcs, bufferSize, excludeDatanodes, newLength, noredirect); - } finally { - reset(); - } } }); } @@ -858,17 +898,13 @@ public Response get( renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction, tokenKind, tokenService, startAfter); - return ugi.doAs(new PrivilegedExceptionAction() { + return doAs(ugi, new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { - try { return get(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, offset, length, renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind, tokenService, noredirect, startAfter); - } finally { - reset(); - } } }); } @@ -1138,15 +1174,11 @@ public Response delete( init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName); - return ugi.doAs(new PrivilegedExceptionAction() { + return doAs(ugi, new PrivilegedExceptionAction() { @Override public Response run() throws IOException { - try { return delete(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, recursive, snapshotName); - } finally { - reset(); - } } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 672b5974d2dc6a..84b51f6159bf93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4273,4 +4273,19 @@ consecutive warnings within this interval. + + httpfs.buffer.size + 4096 + + The size buffer to be used when creating or opening httpfs filesystem IO stream. + + + + + dfs.webhdfs.use.ipc.callq + true + Enables routing of webhdfs calls through rpc + call queue + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java index 41478516010989..ae0f0c274d11ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -89,7 +90,7 @@ public void setUp() throws Exception { // handle failures in the DFSClient pipeline quickly // (for cluster.shutdown(); fs.close() idiom) - conf.setInt("ipc.client.connect.max.retries", 1); + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); } /* diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 9374ae8efe18d7..5a8a39a89e68ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -218,7 +219,7 @@ protected Configuration getConf(int numDataNodes) { conf.setInt("io.bytes.per.checksum", BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes); - conf.setInt("ipc.client.connect.max.retries", 0); + conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); // Set short retry timeouts so this test runs faster conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); return conf; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index 26efce5b75c472..d7a2c811a59579 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.LightWeightCache; import org.junit.After; @@ -111,19 +112,33 @@ public void cleanup() throws IOException { } } + static class DummyCall extends Server.Call { + private UserGroupInformation ugi; + + DummyCall(int callId, byte[] clientId) { + super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId); + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + } + } + @Override + public UserGroupInformation getRemoteUser() { + return ugi; + } + } /** Set the current Server RPC call */ public static void newCall() { - Server.Call call = new Server.Call(++callId, 1, null, null, - RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID); + Server.Call call = new DummyCall(++callId, CLIENT_ID); Server.getCurCall().set(call); } public static void resetCall() { - Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null, - null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID); + Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID, + RpcConstants.DUMMY_CLIENT_ID); Server.getCurCall().set(call); } - + private void concatSetup(String file1, String file2) throws Exception { DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L); DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java new file mode 100644 index 00000000000000..4d3a4f030e2a0f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.top.TopConf; +import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test for MetricsSource part of the {@link TopMetrics} impl. + */ +public class TestTopMetrics { + @Test + public void testPresence() { + Configuration conf = new Configuration(); + TopConf topConf = new TopConf(conf); + TopMetrics topMetrics = new TopMetrics(conf, + topConf.nntopReportingPeriodsMs); + // Dummy command + topMetrics.report("test", "listStatus"); + topMetrics.report("test", "listStatus"); + topMetrics.report("test", "listStatus"); + + MetricsRecordBuilder rb = getMetrics(topMetrics); + MetricsCollector mc = rb.parent(); + + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=60000"); + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=300000"); + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=1500000"); + + verify(rb, times(3)).addCounter(Interns.info("op=listStatus.TotalCount", + "Total operation count"), 3L); + verify(rb, times(3)).addCounter(Interns.info("op=*.TotalCount", + "Total operation count"), 3L); + + verify(rb, times(3)).addCounter(Interns.info("op=listStatus." + + "user=test.count", "Total operations performed by user"), 3L); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java index 15e1c04b817ad6..604bf791d59227 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.net.InetAddress; import java.util.Arrays; import java.util.List; @@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality { private static final String RACK1 = "/rack1"; private static final String RACK2 = "/rack2"; + private static final String LOCALHOST = + InetAddress.getLoopbackAddress().getHostName(); + @Rule public final ExpectedException exception = ExpectedException.none(); @@ -96,7 +100,8 @@ public void testDataLocality() throws Exception { //The chosen datanode must be the same as the client address final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null); + namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null, + LOCALHOST); Assert.assertEquals(ipAddr, chosen.getIpAddr()); } } @@ -121,19 +126,22 @@ public void testDataLocality() throws Exception { { //test GETFILECHECKSUM final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null); + namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null, + LOCALHOST); Assert.assertEquals(expected, chosen); } { //test OPEN final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null); + namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null, + LOCALHOST); Assert.assertEquals(expected, chosen); } { //test APPEND final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null); + namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null, + LOCALHOST); Assert.assertEquals(expected, chosen); } } finally { @@ -189,7 +197,7 @@ public void testExcludeDataNodes() throws Exception { { // test GETFILECHECKSUM final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, - sb.toString()); + sb.toString(), LOCALHOST); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -198,7 +206,8 @@ public void testExcludeDataNodes() throws Exception { { // test OPEN final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString()); + namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(), + LOCALHOST); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -208,7 +217,7 @@ public void testExcludeDataNodes() throws Exception { { // test APPEND final DatanodeInfo chosen = NamenodeWebHdfsMethods .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L, - blocksize, sb.toString()); + blocksize, sb.toString(), LOCALHOST); for (int j = 0; j <= i; j++) { Assert.assertNotEquals(locations[j].getHostName(), chosen.getHostName()); @@ -229,6 +238,6 @@ public void testChooseDatanodeBeforeNamesystemInit() throws Exception { exception.expect(IOException.class); exception.expectMessage("Namesystem has not been intialized yet."); NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0, - DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null); + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 94ecb9e6517ecc..b49f73d622e746 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Before; @@ -363,6 +364,55 @@ public Boolean get() { }, 100, 100 * 100); } + @Test(timeout = 30000) + public void testPrintTopology() throws Exception { + redirectStream(); + + /* init conf */ + final Configuration dfsConf = new HdfsConfiguration(); + final File baseDir = new File( + PathUtils.getTestDir(getClass()), + GenericTestUtils.getMethodName()); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + + final int numDn = 4; + final String[] racks = { + "/d1/r1", "/d1/r2", + "/d2/r1", "/d2/r2"}; + + /* init cluster using topology */ + try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf) + .numDataNodes(numDn).racks(racks).build()) { + + miniCluster.waitActive(); + assertEquals(numDn, miniCluster.getDataNodes().size()); + final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); + + resetStream(); + final int ret = ToolRunner.run(dfsAdmin, new String[] {"-printTopology"}); + + /* collect outputs */ + final List outs = Lists.newArrayList(); + scanIntoList(out, outs); + + /* verify results */ + assertEquals(0, ret); + assertEquals( + "There should be three lines per Datanode: the 1st line is" + + " rack info, 2nd node info, 3rd empty line. The total" + + " should be as a result of 3 * numDn.", + 12, outs.size()); + assertThat(outs.get(0), + is(allOf(containsString("Rack:"), containsString("/d1/r1")))); + assertThat(outs.get(3), + is(allOf(containsString("Rack:"), containsString("/d1/r2")))); + assertThat(outs.get(6), + is(allOf(containsString("Rack:"), containsString("/d2/r1")))); + assertThat(outs.get(9), + is(allOf(containsString("Rack:"), containsString("/d2/r2")))); + } + } + @Test(timeout = 30000) public void testNameNodeGetReconfigurationStatus() throws IOException, InterruptedException, TimeoutException { diff --git a/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm b/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm index 8bbb1ea596d564..9d83ed9700c98d 100644 --- a/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm +++ b/hadoop-tools/hadoop-archives/src/site/markdown/HadoopArchives.md.vm @@ -157,6 +157,6 @@ Hadoop Archives and MapReduce Using Hadoop Archives in MapReduce is as easy as specifying a different input filesystem than the default file system. If you have a hadoop archive stored in HDFS in /user/zoo/foo.har then for using this archive for MapReduce input, - all you need to specify the input directory as har:///user/zoo/foo.har. Since + all you need is to specify the input directory as har:///user/zoo/foo.har. Since Hadoop Archives is exposed as a file system MapReduce will be able to use all the logical input files in Hadoop Archives as input. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4d4335726832df..3bd0dcc56dbaed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -719,17 +719,29 @@ public static boolean isAclEnabled(Configuration conf) { + "leveldb-state-store.compaction-interval-secs"; public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600; - /** The maximum number of completed applications RM keeps. */ + /** + * The maximum number of completed applications RM keeps. By default equals + * to {@link #DEFAULT_RM_MAX_COMPLETED_APPLICATIONS}. + */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; - public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000; + public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 1000; /** - * The maximum number of completed applications RM state store keeps, by - * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS + * The maximum number of completed applications RM state store keeps. By + * default equals to value of {@link #RM_MAX_COMPLETED_APPLICATIONS}. */ public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "state-store.max-completed-applications"; + /** + * The default value for + * {@code yarn.resourcemanager.state-store.max-completed-applications}. + * @deprecated This default value is ignored and will be removed in a future + * release. The default value of + * {@code yarn.resourcemanager.state-store.max-completed-applications} is the + * value of {@link #RM_MAX_COMPLETED_APPLICATIONS}. + */ + @Deprecated public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 524afecd852389..f37c689ba6a7f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -417,7 +417,7 @@ the applications remembered in RM memory. Any values larger than ${yarn.resourcemanager.max-completed-applications} will be reset to ${yarn.resourcemanager.max-completed-applications}. - Note that this value impacts the RM recovery performance.Typically, + Note that this value impacts the RM recovery performance. Typically, a smaller value indicates better performance on RM recovery. yarn.resourcemanager.state-store.max-completed-applications @@ -687,7 +687,7 @@ The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications - 10000 + 1000 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 72327e82e9c9e2..88d2e10256292f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,8 @@ import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -54,6 +57,10 @@ public class EmbeddedElectorService extends AbstractService private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; + private long zkSessionTimeout; + private Timer zkDisconnectTimer; + @VisibleForTesting + final Object zkDisconnectLock = new Object(); EmbeddedElectorService(RMContext rmContext) { super(EmbeddedElectorService.class.getName()); @@ -80,7 +87,7 @@ protected void serviceInit(Configuration conf) YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; - long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, + zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List zkAcls = RMZKUtils.getZKAcls(conf); @@ -123,6 +130,8 @@ protected void serviceStop() throws Exception { @Override public void becomeActive() throws ServiceFailedException { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -132,6 +141,8 @@ public void becomeActive() throws ServiceFailedException { @Override public void becomeStandby() { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -139,13 +150,49 @@ public void becomeStandby() { } } + /** + * Stop the disconnect timer. Any running tasks will be allowed to complete. + */ + private void cancelDisconnectTimer() { + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer != null) { + zkDisconnectTimer.cancel(); + zkDisconnectTimer = null; + } + } + } + + /** + * When the ZK client loses contact with ZK, this method will be called to + * allow the RM to react. Because the loss of connection can be noticed + * before the session timeout happens, it is undesirable to transition + * immediately. Instead the method starts a timer that will wait + * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before + * initiating the transition into standby state. + */ @Override public void enterNeutralMode() { - /** - * Possibly due to transient connection issues. Do nothing. - * TODO: Might want to keep track of how long in this state and transition - * to standby. - */ + LOG.warn("Lost contact with Zookeeper. Transitioning to standby in " + + zkSessionTimeout + " ms if connection is not reestablished."); + + // If we've just become disconnected, start a timer. When the time's up, + // we'll transition to standby. + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer == null) { + zkDisconnectTimer = new Timer("Zookeeper disconnect timer"); + zkDisconnectTimer.schedule(new TimerTask() { + @Override + public void run() { + synchronized (zkDisconnectLock) { + // Only run if the timer hasn't been cancelled + if (zkDisconnectTimer != null) { + becomeStandby(); + } + } + } + }, zkSessionTimeout); + } + } } @SuppressWarnings(value = "unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7352a284c48ca2..c065b607795d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -101,7 +101,7 @@ public RMAppManager(RMContext context, this.maxCompletedAppsInStateStore = conf.getInt( YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, - YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS); + this.maxCompletedAppsInMemory); if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) { this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 20b1c0e0060705..bfd0b4e75eace2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -28,6 +28,14 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestRMEmbeddedElector extends ClientBaseWithFixes { private static final Log LOG = @@ -41,6 +49,14 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { private Configuration conf; private AtomicBoolean callbackCalled; + private enum SyncTestType { + ACTIVE, + STANDBY, + NEUTRAL, + ACTIVE_TIMING, + STANDBY_TIMING + } + @Before public void setup() throws IOException { conf = new YarnConfiguration(); @@ -79,6 +95,181 @@ public void testDeadlockShutdownBecomeActive() throws InterruptedException { LOG.info("Stopped RM"); } + /** + * Test that neutral mode plays well with all other transitions. + * + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + @Test + public void testCallbackSynchronization() + throws IOException, InterruptedException { + testCallbackSynchronization(SyncTestType.ACTIVE); + testCallbackSynchronization(SyncTestType.STANDBY); + testCallbackSynchronization(SyncTestType.NEUTRAL); + testCallbackSynchronization(SyncTestType.ACTIVE_TIMING); + testCallbackSynchronization(SyncTestType.STANDBY_TIMING); + } + + /** + * Helper method to test that neutral mode plays well with other transitions. + * + * @param type the type of test to run + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronization(SyncTestType type) + throws IOException, InterruptedException { + AdminService as = mock(AdminService.class); + RMContext rc = mock(RMContext.class); + Configuration myConf = new Configuration(conf); + + myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rc.getRMAdminService()).thenReturn(as); + + EmbeddedElectorService ees = new EmbeddedElectorService(rc); + ees.init(myConf); + + ees.enterNeutralMode(); + + switch (type) { + case ACTIVE: + testCallbackSynchronizationActive(as, ees); + break; + case STANDBY: + testCallbackSynchronizationStandby(as, ees); + break; + case NEUTRAL: + testCallbackSynchronizationNeutral(as, ees); + break; + case ACTIVE_TIMING: + testCallbackSynchronizationTimingActive(as, ees); + break; + case STANDBY_TIMING: + testCallbackSynchronizationTimingStandby(as, ees); + break; + default: + fail("Unknown test type: " + type); + break; + } + } + + /** + * Helper method to test that neutral mode plays well with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeActive(); + + Thread.sleep(100); + + verify(as).transitionToActive(any()); + verify(as, never()).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode plays well with a standby + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.becomeStandby(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode plays well with itself. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationNeutral(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + ees.enterNeutralMode(); + + Thread.sleep(100); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingActive(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeActive(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as).transitionToActive(any()); + verify(as, never()).transitionToStandby(any()); + } + + /** + * Helper method to test that neutral mode does not race with an active + * transition. + * + * @param as the admin service + * @param ees the embedded elector service + * @throws IOException if there's an issue transitioning + * @throws InterruptedException if interrupted + */ + private void testCallbackSynchronizationTimingStandby(AdminService as, + EmbeddedElectorService ees) throws IOException, InterruptedException { + synchronized (ees.zkDisconnectLock) { + // Sleep while holding the lock so that the timer thread can't do + // anything when it runs. Sleep until we're pretty sure the timer thread + // has tried to run. + Thread.sleep(100); + // While still holding the lock cancel the timer by transitioning. This + // simulates a race where the callback goes to cancel the timer while the + // timer is trying to run. + ees.becomeStandby(); + } + + // Sleep just a little more so that the timer thread can do whatever it's + // going to do, hopefully nothing. + Thread.sleep(50); + + verify(as, atLeast(1)).transitionToStandby(any()); + verify(as, atMost(1)).transitionToStandby(any()); + } + private class MockRMWithElector extends MockRM { private long delayMs = 0;