Skip to content

[STORM-3045] Microsoft Azure EventHubs Storm Spout and Bolt improvements#2588

Closed
SreeramGarlapati wants to merge 30 commits into
apache:masterfrom
SreeramGarlapati:master
Closed

[STORM-3045] Microsoft Azure EventHubs Storm Spout and Bolt improvements#2588
SreeramGarlapati wants to merge 30 commits into
apache:masterfrom
SreeramGarlapati:master

Conversation

@SreeramGarlapati

@SreeramGarlapati SreeramGarlapati commented Mar 7, 2018

Copy link
Copy Markdown

This is continuation of work done by @raviperi.

-update to the latest version of eventhubs java client
-Introduce config params to use latest EH client, control request prefetch size, batch size of events received per call.
-Refactor the code to group classes more appropriately
-Remove redundant types
-Javadoc comments where applicable
-Preftch config parameter to dictate EH prefetch count
-config parameter to introduce sleep between spout's nexttuple calls
-config parameter to retrieve a batched number of events per call to EH
(opposed to single event)
-New data scheme to group event payload and audit params into a single
type, and expose the single type as the only tuple field to downstream
bolts.

Close #2322

raviperi and others added 15 commits September 11, 2017 14:35
of events received per call.

-Refactor the code to group classes more appropriately
-Remove redundant types
-Javadoc comments where applicable
-Preftch config parameter to dictate EH prefetch count
-config parameter to introduce sleep between spout's nexttuple calls
-config parameter to retrieve a batched number of events per call to EH
(opposed to single event)
-New data scheme to group event payload and audit params into a single
type, and expose the single type as the only tuple field to downstream
bolts.
…upported schemes. Enable scheme based serialization in spout
# Conflicts:
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/FieldConstants.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/format/StringEventDataScheme.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubException.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
#	external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
#	external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
#	pom.xml

@srdo srdo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review, got down to the start of the Trident code. Will finish up later.

In addition to the other comments, could you please look at target/checkstyle-violations.xml when you've built, and try to resolve any violations that point to your changes? I noticed a number of places with weird indentation, tabs and ifs with no braces, that I'd like to see fixed, since we're trying to stick to the checkstyle rules for new code.

* @param password Password to use
* @param namespace target namespace for the service bus
* @param entityPath Name of the event hub
* @param partitionMode number of partitions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description might need to be updated.

public void prepare(Map<String, Object> config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger can do String.format inherently. use {} for placeholders (e.g. logger.info("Connection String: {}", boltConfig.getConnectionString())

* <p>
* The implementation has two modes of operation:
* <ul>
* <li>partitionmode = true, One bolt for per partition write.</li>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This seems like an enum would be a good fit.

ehClient.close().whenCompleteAsync((voidargs, error) -> {
try {
if (error != null) {
logger.error("Exception during EventHubBolt cleanup phase" + error.toString());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging the exception without toString so you don't lose the stack trace, i.e. logger.error("Some message", error). This looks like it's a problem in a few places.

public void cleanup() {
logger.debug("EventHubBolt cleanup");
try {
ehClient.close().whenCompleteAsync((voidargs, error) -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any reason to use whenCompleteAsync over whenComplete?

@Override
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
List<String> fields = new LinkedList<String>();
fields.add(FieldConstants.MESSAGE_FIELD);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use the fields declared by the scheme here, otherwise what's the point of having IEventDataScheme provide a declareOutputFields?

import org.apache.storm.eventhubs.core.EventHubConfig;

/**
* EventHub configuration. This class remains in

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Didn't the other class moves already break binary compatibility?

private static final String ZK_LOCAL_URL = "localhost:2181";

private final String zookeeperConnectionString;
private final CuratorFramework curatorFramework;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is serializable

return null;
}

String data = new String(curatorFramework.getData().forPath(statePath));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better if you specify a charset

@Override
public void saveData(String statePath, String data) {
data = StringUtil.isNullOrWhiteSpace(data) ? "" : data;
byte[] bytes = data.getBytes();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should specify a charset

@SreeramGarlapati

SreeramGarlapati commented Mar 11, 2018

Copy link
Copy Markdown
Author

thanks @srdo - i will take a shot at your suggestions by EOD monday PST.

@SreeramGarlapati

Copy link
Copy Markdown
Author

@srdo - when I try to merge my changes with master - I ended up with a merge conflicts due to line endings change. is there any simple tip - which I can use to ease through this? like changed my files to a specific line ending before the merge etc?
Thanks a lot for the help so far!

@srdo

srdo commented Mar 24, 2018

Copy link
Copy Markdown
Contributor

@SreeramGarlapati Quick google suggests that you can try adding -Xrenormalize to the merge command. https://stackoverflow.com/a/12194759/8845188

@srdo

srdo commented Mar 24, 2018

Copy link
Copy Markdown
Contributor

And also, yes you can change the line ending to LF and that should work too

@srdo

srdo commented Mar 24, 2018

Copy link
Copy Markdown
Contributor

By the way, when you get a chance, please go to https://issues.apache.org/jira and create an issue for tracking these changes, then rename the PR and commit(s) so they contain the issue number. It makes it easier for us to track which branches the changes are applied to, and helps us generate correct release notes. Thanks.

@SreeramGarlapati SreeramGarlapati changed the title Microsoft Azure EventHubs Storm Spout and Bolt improvements [STORM-3045] Microsoft Azure EventHubs Storm Spout and Bolt improvements Apr 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants