Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
*******************************************************************************/
package org.apache.storm.eventhubs.bolt;

import java.util.Map;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.servicebus.ServiceBusException;
import org.apache.storm.eventhubs.spout.EventHubException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -31,82 +28,102 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ExecutionException;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.servicebus.ServiceBusException;

/**
* A bolt that writes event message to EventHub.
*
* <p>
* The implementation has two modes of operation:
* <ul>
* <li>partitionmode = true, One bolt for per partition write.</li>
* <li>partitionmode = false, use default partitioning key strategy to write to
* partition(s)</li>
* </ul>
* </p>
*/
public class EventHubBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory
.getLogger(EventHubBolt.class);
private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);

protected OutputCollector collector;
protected PartitionSender sender;
protected EventHubClient ehClient;
protected PartitionSender sender;
protected EventHubBoltConfig boltConfig;

/**
* Constructs an instance that uses the specified connection string to connect
* to an EventHub and write to the specified entityPath
*
* @param connectionString
* EventHub connection String
* @param entityPath
* entity path to write to
*/
public EventHubBolt(String connectionString, String entityPath) {
boltConfig = new EventHubBoltConfig(connectionString, entityPath);
}

public EventHubBolt(String userName, String password, String namespace,
String entityPath, boolean partitionMode) {
boltConfig = new EventHubBoltConfig(userName, password, namespace,
entityPath, partitionMode);
/**
* Constructs an instance that connects to an EventHub using the specified
* connection credentials.
*
* @param userName
* UserName to connect as
* @param password
* Password to use
* @param namespace
* target namespace for the service bus
* @param entityPath
* Name of the event hub
* @param partitionMode
* number of partitions
*/
public EventHubBolt(String userName, String password, String namespace, String entityPath, boolean partitionMode) {
boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode);
}

/**
* Constructs an instance using the specified configuration
*
* @param config
* EventHub connection and partition configuration
*/
public EventHubBolt(EventHubBoltConfig config) {
boltConfig = config;
}

@Override
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
String myPartitionId = null;
if (boltConfig.getPartitionMode()) {
// We can use the task index (starting from 0) as the partition ID
myPartitionId = "" + context.getThisTaskIndex();
}
logger.info("creating sender: " + boltConfig.getConnectionString()
+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(),
String.valueOf(boltConfig.getPartitionMode())));
try {
ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
if (boltConfig.getPartitionMode()) {
sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
String partitionId = String.valueOf(context.getThisTaskIndex());
logger.info("Writing to partition id: " + partitionId);
sender = ehClient.createPartitionSenderSync(partitionId);
}
} catch (Exception ex) {
collector.reportError(ex);
throw new RuntimeException(ex);
}

}

@Override
public void execute(Tuple tuple) {
try {
EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
if (boltConfig.getPartitionMode() && sender!=null) {
sender.sendSync(sendEvent);
}
else if (boltConfig.getPartitionMode() && sender==null) {
throw new EventHubException("Sender is null");
}
else if (!boltConfig.getPartitionMode() && ehClient!=null) {
if (sender == null) {
ehClient.sendSync(sendEvent);
}
else if (!boltConfig.getPartitionMode() && ehClient==null) {
throw new EventHubException("ehclient is null");
} else {
sender.sendSync(sendEvent);
}
collector.ack(tuple);
} catch (EventHubException ex ) {
collector.reportError(ex);
collector.fail(tuple);
} catch (ServiceBusException e) {
collector.reportError(e);
collector.fail(tuple);
Expand All @@ -115,32 +132,31 @@ else if (!boltConfig.getPartitionMode() && ehClient==null) {

@Override
public void cleanup() {
if(sender != null) {
try {
sender.close().whenComplete((voidargs,error)->{
try{
if(error!=null){
logger.error("Exception during sender cleanup phase"+error.toString());
}
ehClient.closeSync();
}catch (Exception e){
logger.error("Exception during ehclient cleanup phase"+e.toString());
logger.debug("EventHubBolt cleanup");
try {
ehClient.close().whenComplete((voidargs, error) -> {
try {
if (error != null) {
logger.error("Exception during EventHubBolt cleanup phase" + error.toString());
}
}).get();
} catch (InterruptedException e) {
logger.error("Exception occured during cleanup phase"+e.toString());
} catch (ExecutionException e) {
logger.error("Exception occured during cleanup phase"+e.toString());
}
logger.info("Eventhub Bolt cleaned up");
if (sender != null) {
sender.closeSync();
}
} catch (Exception e) {
logger.error("Exception during EventHubBolt cleanup phase" + e.toString());
}
}).get();

} catch (InterruptedException | ExecutionException e) {
logger.error("Exception occured during cleanup phase" + e.toString());
} finally {
sender = null;
ehClient = null;
ehClient = null;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}
Loading