Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions external/storm-hdfs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,22 @@ If you are using Trident and sequence files you can do something like this:
```


### Proxy User for HDFS interaction
The HDFS bolt implementation now allows you to interact with HDFS as a
user that is different than the user running the worker process in a
non-secured cluster. The bolt checks for the key `hdfs.proxyuser` in the
map that is set as the value for the key set as the config key in
the bolt using the withConfigKey method. The key value pair just mentioned is
passed to the topology in the configuration object during creation of the
topology. The HDFS config core-site.xml needs to be modified for HDFS to
allow such a proxy user functionality. More details on how to modify the file
and its limitations can be found at

http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/Superusers.html




## Support for HDFS Sequence Files

The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
Expand All @@ -34,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Map;
import java.util.Timer;
Expand All @@ -54,6 +56,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
protected String configKey;
protected transient Object writeLock;
protected transient Timer rotationTimer; // only used for TimedRotationPolicy
protected transient UserGroupInformation userGroupInformation;

protected transient Configuration hdfsConfig;

Expand Down Expand Up @@ -81,7 +84,8 @@ protected void rotateOutputFile() throws IOException {
* @param topologyContext
* @param collector
*/
public final void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){
public final void prepare(final Map conf, final TopologyContext
topologyContext, final OutputCollector collector){
this.writeLock = new Object();
if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
Expand All @@ -99,11 +103,23 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle
}
}


try{
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, topologyContext, collector);
this.currentFile = createOutputFile();
String ugiUser = (String) this.hdfsConfig.get("hdfs.proxyuser");
if (ugiUser == null) {
this.userGroupInformation = UserGroupInformation.getLoginUser();
} else {
this.userGroupInformation = UserGroupInformation.createProxyUser
(ugiUser,
UserGroupInformation.getLoginUser());
}
this.userGroupInformation.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
doPrepare(conf, topologyContext, collector);
currentFile = createOutputFile();
return null;
}
});

} catch (Exception e){
throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
Expand All @@ -116,9 +132,19 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle
@Override
public void run() {
try {
rotateOutputFile();
userGroupInformation.doAs(
new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
rotateOutputFile();
return null;
}
}
);
} catch(IOException e){
LOG.warn("IOException during scheduled file rotation.", e);
} catch (InterruptedException ie) {
LOG.warn("InterruptedException during scheduled file " +
"rotation.", ie);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.Map;

Expand Down Expand Up @@ -87,33 +88,45 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector
}

@Override
public void execute(Tuple tuple) {
public void execute(final Tuple tuple) {
try {
byte[] bytes = this.format.format(tuple);
synchronized (this.writeLock) {
out.write(bytes);
this.offset += bytes.length;

if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
this.userGroupInformation.doAs(
new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
byte[] bytes = format.format(tuple);
synchronized (writeLock) {
out.write(bytes);
offset += bytes.length;

if (syncPolicy.mark(tuple, offset)) {
if (out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) out).hsync
(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
out.hsync();
}
syncPolicy.reset();
}
}

collector.ack(tuple);

if(rotationPolicy.mark(tuple, offset)){
rotateOutputFile(); // synchronized
offset = 0;
rotationPolicy.reset();
}
return null;
}
this.syncPolicy.reset();
}
}
);

this.collector.ack(tuple);

if(this.rotationPolicy.mark(tuple, this.offset)){
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} catch (InterruptedException ie) {
this.collector.reportError(ie);
this.collector.fail(tuple);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;

public class SequenceFileBolt extends AbstractHdfsBolt {
Expand Down Expand Up @@ -104,29 +105,38 @@ public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector
}

@Override
public void execute(Tuple tuple) {
public void execute(final Tuple tuple) {
try {
long offset;
synchronized (this.writeLock) {
this.writer.append(this.format.key(tuple), this.format.value(tuple));
offset = this.writer.getLength();

if (this.syncPolicy.mark(tuple, offset)) {
this.writer.hsync();
this.syncPolicy.reset();
}
}

this.collector.ack(tuple);
if (this.rotationPolicy.mark(tuple, offset)) {
rotateOutputFile(); // synchronized
this.rotationPolicy.reset();
}
this.userGroupInformation.doAs(
new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
long offset;
synchronized (writeLock) {
writer.append(format.key(tuple), format.value(tuple));
offset = writer.getLength();

if (syncPolicy.mark(tuple, offset)) {
writer.hsync();
syncPolicy.reset();
}
}

collector.ack(tuple);
if (rotationPolicy.mark(tuple, offset)) {
rotateOutputFile(); // synchronized
rotationPolicy.reset();
}
return null;
}
}
);
} catch (IOException e) {
this.collector.reportError(e);
this.collector.fail(tuple);
} catch (InterruptedException ie) {
this.collector.reportError(ie);
this.collector.fail(tuple);
}

}

Path createOutputFile() throws IOException {
Expand Down
Loading