Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions external/storm-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,5 @@
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.hdfs.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.ClosingFilesPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.NullPartitioner;
import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.security.HdfsSecurityUtil;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public abstract class AbstractHdfsBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
private static final Integer DEFAULT_RETRY_COUNT = 3;
Expand All @@ -56,7 +59,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
private static final Integer DEFAULT_MAX_OPEN_FILES = 50;

protected Map<String, Writer> writers;
private Map<String, Writer> writers;
protected Map<String, Integer> rotationCounterMap = new HashMap<>();
protected List<RotationAction> rotationActions = new ArrayList<>();
protected OutputCollector collector;
Expand All @@ -74,6 +77,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES;
protected Partitioner partitioner = new NullPartitioner();
protected ClosingFilesPolicy closingFilesPolicy;

protected transient Configuration hdfsConfig;

Expand All @@ -94,14 +98,18 @@ protected void rotateOutputFile(Writer writer) throws IOException {

/**
* Marked as final to prevent override. Subclasses should implement the doPrepare() method.
* @param conf
* @param topologyContext
* @param collector
* @param conf configuration param
* @param topologyContext topology context
* @param collector Output collector
*/
public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector){
public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) {
this.writeLock = new Object();
if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
if (this.syncPolicy == null) {
throw new IllegalStateException("SyncPolicy must be specified.");
}
if (this.rotationPolicy == null) {
throw new IllegalStateException("RotationPolicy must be specified.");
}
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
}
Expand All @@ -112,20 +120,20 @@ public final void prepare(Map<String, Object> conf, TopologyContext topologyCont
this.fileNameFormat.prepare(conf, topologyContext);
this.hdfsConfig = new Configuration();
Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
if(map != null){
for(String key : map.keySet()){
if (map != null) {
for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
}
}

try{
try {
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, topologyContext, collector);
} catch (Exception e){
} catch (Exception e) {
throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
}

if(this.rotationPolicy instanceof TimedRotationPolicy){
if (this.rotationPolicy instanceof TimedRotationPolicy) {
startTimedRotationPolicy();
}
}
Expand All @@ -142,6 +150,7 @@ public final void execute(Tuple tuple) {
LOG.debug("TICK! forcing a file system flush");
this.collector.ack(tuple);
forceSync = true;
checkClosingPolicy();
} else {

writerKey = getHashKeyForTuple(tuple);
Expand Down Expand Up @@ -209,18 +218,20 @@ private Writer getOrCreateWriter(String writerKey, Tuple tuple) throws IOExcepti
Path pathForNextFile = getBasePathForNextFile(tuple);
writer = makeNewWriter(pathForNextFile, tuple);
writers.put(writerKey, writer);
} else if (this.closingFilesPolicy != null) {
writer.resetClosingPolicy();
}
return writer;
}

/**
* A tuple must be mapped to a writer based on two factors:
* A tuple must be mapped to a writer based on two factors.
* - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
* for an example of this)
* - the directory the tuple will be partioned into
* for an example of this).
* - the directory the tuple will be partioned into.
*
* @param tuple
* @return
* @param tuple tuple value
* @return hash key for tuple
*/
private String getHashKeyForTuple(Tuple tuple) {
final String boltKey = getWriterKey(tuple);
Expand All @@ -243,6 +254,30 @@ void doRotationAndRemoveWriter(String writerKey, Writer writer) {
}
}

private void checkClosingPolicy() {
if (closingFilesPolicy != null) {
LOG.debug("Closing policy exists");
Iterator iterator = writers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Writer> entry = (Map.Entry<String, Writer>) iterator.next();
Writer hdfswriter = entry.getValue();
String writerKey = entry.getKey();
LOG.debug("Checking writer key: " + writerKey);
hdfswriter.updateClosingPolicy();
if (hdfswriter.needsRotation()) {
try {
rotateOutputFile(hdfswriter);
} catch (IOException e) {
this.collector.reportError(e);
LOG.error("File could not be rotated");
} finally {
iterator.remove();
}
}
}
}
}

@Override
public Map<String, Object> getComponentConfiguration() {
return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval);
Expand Down Expand Up @@ -292,8 +327,7 @@ protected Path getBasePathForNextFile(Tuple tuple) {

final String partitionPath = this.partitioner.getPartitionPath(tuple);
final int rotation;
if (rotationCounterMap.containsKey(partitionPath))
{
if (rotationCounterMap.containsKey(partitionPath)) {
rotation = rotationCounterMap.get(partitionPath) + 1;
} else {
rotation = 0;
Expand All @@ -304,11 +338,13 @@ protected Path getBasePathForNextFile(Tuple tuple) {
this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
}

abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException;
protected abstract void doPrepare(Map<String, Object> conf,
TopologyContext topologyContext,
OutputCollector collector) throws IOException;

abstract protected String getWriterKey(Tuple tuple);
protected abstract String getWriterKey(Tuple tuple);

abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException;

static class WritersMap extends LinkedHashMap<String, Writer> {
final long maxWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.storm.hdfs.bolt;

import org.apache.storm.hdfs.bolt.rotation.ClosingFilesPolicy;
import org.apache.storm.hdfs.common.AbstractHDFSWriter;
import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter;
import org.apache.storm.hdfs.common.Partitioner;
Expand Down Expand Up @@ -87,6 +88,13 @@ public AvroGenericRecordBolt withPartitioner(Partitioner partitioner) {
return this;
}


public AvroGenericRecordBolt withClosingFilesPolicy(ClosingFilesPolicy policy)
{
this.closingFilesPolicy = policy;
return this;
}

@Override
protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing AvroGenericRecord Bolt...");
Expand All @@ -107,6 +115,11 @@ protected String getWriterKey(Tuple tuple) {
@Override
protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema();
return new AvroGenericRecordHDFSWriter(this.rotationPolicy, path, this.fs.create(path), recordSchema);

AbstractHDFSWriter writer =new AvroGenericRecordHDFSWriter(this.rotationPolicy, path, this.fs.create(path), recordSchema);
if(closingFilesPolicy!=null)
writer = writer.withClosingFilesPolicy(closingFilesPolicy.copy());

return writer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public interface Writer {

void close() throws IOException;

void updateClosingPolicy();

void resetClosingPolicy();

boolean needsRotation();

Path getFilePath();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.storm.hdfs.bolt.rotation;

import java.io.Serializable;

/**
* New Interface lets you define when particular file should be closed.
*/
public interface ClosingFilesPolicy extends Serializable {

public boolean closeWriter();

public void reset();

public ClosingFilesPolicy copy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.storm.hdfs.bolt.rotation;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This implementation of closing files policy waits for tick tuple and at each tick tuple increments its counter.
* counter > threshold will indicate if file rotation is needed.
*/
public class TickTupleBasedClosingPolicy implements ClosingFilesPolicy {
private static final Logger LOG = LoggerFactory.getLogger(TickTupleBasedClosingPolicy.class);
private final int threshold;
private int count;

public TickTupleBasedClosingPolicy(int threshold) {
this.threshold = threshold;
}

@Override
public boolean closeWriter() {
LOG.debug( "Threshold: " + threshold+ "count: " + count );
this.count++;

if(this.count > this.threshold) {
LOG.info("Count exceeded threshold: ");
reset();
return true;
}
return false;

}
// This is invoked for each tuple consumed for current writer.
@Override
public void reset() {
this.count =0;
}

@Override
public ClosingFilesPolicy copy() {
return new TickTupleBasedClosingPolicy(this.threshold);
}
}
Loading