Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions external/storm-hdfs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ The following example creates an HDFS spout that reads text files from HDFS path

```java
// Instantiate spout to read text files
HdfsSpout textReaderSpout = new HdfsSpout().setReaderType("text")
HdfsSpout textReaderSpout = new HdfsSpout().setReaderType(TextFileReader.class)
.withOutputFields(TextFileReader.defaultFields)
.setHdfsUri("hdfs://localhost:54310") // reqd
.setSourceDir("/data/in") // reqd
Expand Down Expand Up @@ -540,7 +540,7 @@ Only methods mentioned in **bold** are required.

| Method | Alternative config name (deprecated) | Default | Description |
|----------------------------|--------------------------------------|-------------|-------------|
| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'seq' for reading sequence files or 'text' for text files. Set to a fully qualified class name if using a custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)|
| **.setReaderType()** |~~hdfsspout.reader.type~~ | | Determines which file reader to use. Set to 'org.apache.storm.hdfs.spout.SequenceFileReader' for reading sequence files or 'org.apache.storm.hdfs.spout.TextFileReader' for text files OR set to custom file reader class (that implements interface org.apache.storm.hdfs.spout.FileReader)|
| **.withOutputFields()** | | | Sets the names for the output fields for the spout. The number of fields depends upon the reader being used. For convenience, built-in reader types expose a static member called `defaultFields` that can be used for setting this.|
| **.setHdfsUri()** |~~hdfsspout.hdfs~~ | | HDFS URI for the hdfs Name node. Example: hdfs://namenodehost:8020|
| **.setSourceDir()** |~~hdfsspout.source.dir~~ | | HDFS directory from where to read files. E.g. /data/inputdir|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.storm.Config;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -49,7 +50,8 @@ public class HdfsSpout extends BaseRichSpout {

// user configurable
private String hdfsUri; // required
private String readerType; // required
private String readerType;
private Class<? extends FileReader> readerTypeClass; // required
private Fields outputFields; // required

private String sourceDir; // required
Expand Down Expand Up @@ -112,9 +114,17 @@ public HdfsSpout setHdfsUri(String hdfsUri) {
this.hdfsUri = hdfsUri;
return this;
}

/**
* @deprecated use {@link #setReaderType(Class)} instead.
*/
@Deprecated
public HdfsSpout setReaderType(String readerType) {
this.readerType = readerType;
this.readerType = readerType;
return this;
}

public HdfsSpout setReaderType(Class<? extends FileReader> readerTypeClass) {
this.readerTypeClass = readerTypeClass;
return this;
}

Expand Down Expand Up @@ -409,9 +419,14 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect

// Reader type config
if ( readerType==null && conf.containsKey(Configs.READER_TYPE) ) {
readerType = conf.get(Configs.READER_TYPE).toString();
String className = (String) conf.get(Configs.READER_TYPE);
try {
readerTypeClass = (Class<? extends FileReader>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to instantiate " + className, e);
}
}
checkValidReader(readerType);


// -- source dir config
if ( sourceDir==null && conf.containsKey(Configs.SOURCE_DIR) ) {
Expand Down Expand Up @@ -531,22 +546,6 @@ private String getDefaultLockDir(Path sourceDirPath) {
return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
}

private static void checkValidReader(String readerType) {
if ( readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) )
return;
try {
Class<?> classType = Class.forName(readerType);
classType.getConstructor(FileSystem.class, Path.class, Map.class);
return;
} catch (ClassNotFoundException e) {
LOG.error(readerType + " not found in classpath.", e);
throw new IllegalArgumentException(readerType + " not found in classpath.", e);
} catch (NoSuchMethodException e) {
LOG.error(readerType + " is missing the expected constructor for Readers.", e);
throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers.");
}
}

@Override
public void ack(Object msgId) {
LOG.trace("Ack received for msg {} on spout {}", msgId, spoutId);
Expand Down Expand Up @@ -696,18 +695,13 @@ private boolean hasExpired(long lastModifyTime) {
*/
private FileReader createFileReader(Path file)
throws IOException {
if ( readerType.equalsIgnoreCase(Configs.SEQ) ) {
return new SequenceFileReader(this.hdfs, file, conf);
}
if ( readerType.equalsIgnoreCase(Configs.TEXT) ) {
return new TextFileReader(this.hdfs, file, conf);
}
try {
Class<?> clsType = Class.forName(readerType);
Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class);
if(StringUtils.isNotBlank(readerType)){
readerTypeClass = (Class<? extends FileReader>) Class.forName(readerType);
}
Constructor<?> constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class);
return (FileReader) constructor.newInstance(this.hdfs, file, conf);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException("Unable to instantiate " + readerType + " reader", e);
}
}
Expand All @@ -722,19 +716,13 @@ private FileReader createFileReader(Path file)
*/
private FileReader createFileReader(Path file, String offset)
throws IOException {
if ( readerType.equalsIgnoreCase(Configs.SEQ) ) {
return new SequenceFileReader(this.hdfs, file, conf, offset);
}
if ( readerType.equalsIgnoreCase(Configs.TEXT) ) {
return new TextFileReader(this.hdfs, file, conf, offset);
}

try {
Class<?> clsType = Class.forName(readerType);
Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class);
if(StringUtils.isNotBlank(readerType)){
readerTypeClass = (Class<? extends FileReader>) Class.forName(readerType);
}
Constructor<?> constructor = readerTypeClass.getConstructor(FileSystem.class, Path.class, Map.class, String.class);
return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException("Unable to instantiate " + readerType, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static void teardownClass() throws IOException {

@Before
public void setup() throws Exception {
baseFolder = tempFolder.newFolder("hdfsspout");
baseFolder = tempFolder.newFolder("HdfsSpout");
source = new Path(baseFolder.toString() + "/source");
fs.mkdirs(source);
archive = new Path(baseFolder.toString() + "/archive");
Expand All @@ -117,7 +117,7 @@ public void testSimpleText_noACK() throws IOException {
Path file2 = new Path(source.toString() + "/file2.txt");
createTextFile(file2, 5);

HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1);

Expand All @@ -139,7 +139,7 @@ public void testSimpleText_ACK() throws IOException {
Path file2 = new Path(source.toString() + "/file2.txt");
createTextFile(file2, 5);

HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1);

Expand All @@ -165,13 +165,13 @@ public void testResumeAbandoned_Text_NoAck() throws Exception {

final Integer lockExpirySec = 1;

HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1000); // effectively disable commits based on time
spout.setLockTimeoutSec(lockExpirySec);


HdfsSpout spout2 = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout2 = makeSpout(TextFileReader.class);
spout2.setCommitFrequencyCount(1);
spout2.setCommitFrequencySec(1000); // effectively disable commits based on time
spout2.setLockTimeoutSec(lockExpirySec);
Expand Down Expand Up @@ -222,13 +222,13 @@ public void testResumeAbandoned_Seq_NoAck() throws Exception {

final Integer lockExpirySec = 1;

HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields);
HdfsSpout spout = makeSpout(SequenceFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1000); // effectively disable commits based on time
spout.setLockTimeoutSec(lockExpirySec);


HdfsSpout spout2 = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields);
HdfsSpout spout2 = makeSpout(SequenceFileReader.class);
spout2.setCommitFrequencyCount(1);
spout2.setCommitFrequencySec(1000); // effectively disable commits based on time
spout2.setLockTimeoutSec(lockExpirySec);
Expand Down Expand Up @@ -344,7 +344,7 @@ public void testMultipleFileConsumption_Ack() throws Exception {
createTextFile(file1, 5);


HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1);

Expand Down Expand Up @@ -414,9 +414,9 @@ private static boolean getBoolField(HdfsSpout spout, String fieldName) throws No
@Test
public void testSimpleSequenceFile() throws IOException {
//1) create a couple files to consume
source = new Path("/tmp/hdfsspout/source");
source = new Path("/tmp/HdfsSpout/source");
fs.mkdirs(source);
archive = new Path("/tmp/hdfsspout/archive");
archive = new Path("/tmp/HdfsSpout/archive");
fs.mkdirs(archive);

Path file1 = new Path(source + "/file1.seq");
Expand All @@ -426,7 +426,7 @@ public void testSimpleSequenceFile() throws IOException {
createSeqFile(fs, file2, 5);


HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields);
HdfsSpout spout = makeSpout(SequenceFileReader.class);
Map conf = getCommonConfigs();
openSpout(spout, 0, conf);

Expand Down Expand Up @@ -454,7 +454,7 @@ public void testReadFailures() throws Exception {
Assert.assertEquals(2, listDir(source).size());

// 2) run spout
HdfsSpout spout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields);
HdfsSpout spout = makeSpout(MockTextFailingReader.class);
Map conf = getCommonConfigs();
openSpout(spout, 0, conf);

Expand All @@ -476,7 +476,7 @@ public void testLocking() throws Exception {

// 0) config spout to log progress in lock file for each tuple

HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(1);
spout.setCommitFrequencySec(1000); // effectively disable commits based on time

Expand Down Expand Up @@ -527,7 +527,7 @@ public void testLockLoggingFreqCount() throws Exception {

// 0) config spout to log progress in lock file for each tuple

HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(2); // 1 lock log entry every 2 tuples
spout.setCommitFrequencySec(1000); // Effectively disable commits based on time

Expand All @@ -554,7 +554,7 @@ public void testLockLoggingFreqSec() throws Exception {
createTextFile(file1, 10);

// 0) config spout to log progress in lock file for each tuple
HdfsSpout spout = makeSpout(Configs.TEXT, TextFileReader.defaultFields);
HdfsSpout spout = makeSpout(TextFileReader.class);
spout.setCommitFrequencyCount(0); // disable it
spout.setCommitFrequencySec(2); // log every 2 sec

Expand Down Expand Up @@ -594,9 +594,8 @@ private Map getCommonConfigs() {
return conf;
}

private HdfsSpout makeSpout(String readerType, String[] outputFields) {
HdfsSpout spout = new HdfsSpout().withOutputFields(outputFields)
.setReaderType(readerType)
private HdfsSpout makeSpout(Class<? extends AbstractFileReader> readerType) {
HdfsSpout spout = new HdfsSpout().setReaderType(readerType)
.setHdfsUri(hdfsCluster.getURI().toString())
.setSourceDir(source.toString())
.setArchiveDir(archive.toString())
Expand Down