diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index c7ab7ca4bd..0b3368ce7b 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -504,7 +504,7 @@ The following example creates an HDFS spout that reads text files from HDFS path ```java // Instantiate spout to read text files -HdfsSpout textReaderSpout = new HdfsSpout().setReaderType("text") +HdfsSpout textReaderSpout = new HdfsSpout().setReaderType(TextFileReader.class) .withOutputFields(TextFileReader.defaultFields) .setHdfsUri("hdfs://localhost:54310") // reqd .setSourceDir("/data/in") // reqd @@ -540,7 +540,7 @@ Only methods mentioned in **bold** are required. | Method | Alternative config name (deprecated) | Default | Description | |----------------------------|--------------------------------------|-------------|-------------| -| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)| +| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'org.apache.storm.hdfs.spout.SequenceFileReader' for reading sequence files or 'org.apache.storm.hdfs.spout.TextFileReader' for text files OR set to custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)| | **.withOutputFields()** | | | Sets the names for the output fields for the spout. The number of fields depends upon the reader being used. For convenience, built-in reader types expose a static member called `defaultFields` that can be used for setting this.| | **.setHdfsUri()** |~~hdfsspout.hdfs~~ | | HDFS URI for the hdfs Name node. Example: hdfs://namenodehost:8020| | **.setSourceDir()** |~~hdfsspout.source.dir~~ | | HDFS directory from where to read files. E.g. /data/inputdir| diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index b7627f2417..8914c9a41c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.storm.Config; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +50,8 @@ public class HdfsSpout extends BaseRichSpout { // user configurable private String hdfsUri; // required - private String readerType; // required + private String readerType; + private Class readerTypeClass; // required private Fields outputFields; // required private String sourceDir; // required @@ -112,9 +114,17 @@ public HdfsSpout setHdfsUri(String hdfsUri) { this.hdfsUri = hdfsUri; return this; } - + /** + * @deprecated use {@link #setReaderType(Class)} instead. + */ + @Deprecated public HdfsSpout setReaderType(String readerType) { - this.readerType = readerType; + this.readerType = readerType; + return this; + } + + public HdfsSpout setReaderType(Class readerTypeClass) { + this.readerTypeClass = readerTypeClass; return this; } @@ -409,9 +419,14 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect // Reader type config if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) { - readerType = conf.get(Configs.READER_TYPE).toString(); + String className = (String) conf.get(Configs.READER_TYPE); + try { + readerTypeClass = (Class) Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to instantiate " + className, e); + } } - checkValidReader(readerType); + // -- source dir config if ( sourceDir==null && conf.containsKey(Configs.SOURCE_DIR) ) { @@ -531,22 +546,6 @@ private String getDefaultLockDir(Path sourceDirPath) { return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR; } - private static void checkValidReader(String readerType) { - if ( readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) ) - return; - try { - Class classType = Class.forName(readerType); - classType.getConstructor(FileSystem.class, Path.class, Map.class); - return; - } catch (ClassNotFoundException e) { - LOG.error(readerType + " not found in classpath.", e); - throw new IllegalArgumentException(readerType + " not found in classpath.", e); - } catch (NoSuchMethodException e) { - LOG.error(readerType + " is missing the expected constructor for Readers.", e); - throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers."); - } - } - @Override public void ack(Object msgId) { LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId); @@ -696,18 +695,13 @@ private boolean hasExpired(long lastModifyTime) { */ private FileReader createFileReader(Path file) throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf); - } try { - Class clsType = Class.forName(readerType); - Constructor constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class); + if(StringUtils.isNotBlank(readerType)){ + readerTypeClass = (Class) Class.forName(readerType); + } + Constructor constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { - LOG.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); } } @@ -722,19 +716,13 @@ private FileReader createFileReader(Path file) */ private FileReader createFileReader(Path file, String offset) throws IOException { - if ( readerType.equalsIgnoreCase(Configs.SEQ) ) { - return new SequenceFileReader(this.hdfs, file, conf, offset); - } - if ( readerType.equalsIgnoreCase(Configs.TEXT) ) { - return new TextFileReader(this.hdfs, file, conf, offset); - } - try { - Class clsType = Class.forName(readerType); - Constructor constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class); + if(StringUtils.isNotBlank(readerType)){ + readerTypeClass = (Class) Class.forName(readerType); + } + Constructor constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class, String.class); return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset); } catch (Exception e) { - LOG.error(e.getMessage(), e); throw new RuntimeException("Unable to instantiate " + readerType, e); } } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index f60cbf3e31..264ccca4be 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -95,7 +95,7 @@ public static void teardownClass() throws IOException { @Before public void setup() throws Exception { - baseFolder = tempFolder.newFolder("hdfsspout"); + baseFolder = tempFolder.newFolder("HdfsSpout"); source = new Path(baseFolder.toString() + "/source"); fs.mkdirs(source); archive = new Path(baseFolder.toString() + "/archive"); @@ -117,7 +117,7 @@ public void testSimpleText_noACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -139,7 +139,7 @@ public void testSimpleText_ACK() throws IOException { Path file2 = new Path(source.toString() + "/file2.txt"); createTextFile(file2, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -165,13 +165,13 @@ public void testResumeAbandoned_Text_NoAck() throws Exception { final Integer lockExpirySec = 1; - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HdfsSpout spout2 = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout2 = makeSpout(TextFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -222,13 +222,13 @@ public void testResumeAbandoned_Seq_NoAck() throws Exception { final Integer lockExpirySec = 1; - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HdfsSpout spout = makeSpout(SequenceFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time spout.setLockTimeoutSec(lockExpirySec); - HdfsSpout spout2 = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HdfsSpout spout2 = makeSpout(SequenceFileReader.class); spout2.setCommitFrequencyCount(1); spout2.setCommitFrequencySec(1000); // effectively disable commits based on time spout2.setLockTimeoutSec(lockExpirySec); @@ -344,7 +344,7 @@ public void testMultipleFileConsumption_Ack() throws Exception { createTextFile(file1, 5); - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1); @@ -414,9 +414,9 @@ private static boolean getBoolField(HdfsSpout spout, String fieldName) throws No @Test public void testSimpleSequenceFile() throws IOException { //1) create a couple files to consume - source = new Path("/tmp/hdfsspout/source"); + source = new Path("/tmp/HdfsSpout/source"); fs.mkdirs(source); - archive = new Path("/tmp/hdfsspout/archive"); + archive = new Path("/tmp/HdfsSpout/archive"); fs.mkdirs(archive); Path file1 = new Path(source + "/file1.seq"); @@ -426,7 +426,7 @@ public void testSimpleSequenceFile() throws IOException { createSeqFile(fs, file2, 5); - HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields); + HdfsSpout spout = makeSpout(SequenceFileReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -454,7 +454,7 @@ public void testReadFailures() throws Exception { Assert.assertEquals(2, listDir(source).size()); // 2) run spout - HdfsSpout spout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields); + HdfsSpout spout = makeSpout(MockTextFailingReader.class); Map conf = getCommonConfigs(); openSpout(spout, 0, conf); @@ -476,7 +476,7 @@ public void testLocking() throws Exception { // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(1); spout.setCommitFrequencySec(1000); // effectively disable commits based on time @@ -527,7 +527,7 @@ public void testLockLoggingFreqCount() throws Exception { // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples spout.setCommitFrequencySec(1000); // Effectively disable commits based on time @@ -554,7 +554,7 @@ public void testLockLoggingFreqSec() throws Exception { createTextFile(file1, 10); // 0) config spout to log progress in lock file for each tuple - HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields); + HdfsSpout spout = makeSpout(TextFileReader.class); spout.setCommitFrequencyCount(0); // disable it spout.setCommitFrequencySec(2); // log every 2 sec @@ -594,9 +594,8 @@ private Map getCommonConfigs() { return conf; } - private HdfsSpout makeSpout(String readerType, String[] outputFields) { - HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields) - .setReaderType(readerType) + private HdfsSpout makeSpout(Class readerType) { + HdfsSpout spout = new HdfsSpout().setReaderType(readerType) .setHdfsUri(hdfsCluster.getURI().toString()) .setSourceDir(source.toString()) .setArchiveDir(archive.toString())