Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public StormTopology createTopology() {
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
return setBolt(id, bolt, 1);
}

/**
Expand All @@ -176,7 +176,7 @@ public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentExc
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
public BoltDeclarer setBolt(String id, IRichBolt bolt, int parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
Expand All @@ -195,7 +195,7 @@ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint)
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
return setBolt(id, bolt, 1);
}

/**
Expand All @@ -210,7 +210,7 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentEx
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
public BoltDeclarer setBolt(String id, IBasicBolt bolt, int parallelism_hint) throws IllegalArgumentException {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}

Expand All @@ -225,7 +225,7 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint)
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
return setBolt(id, bolt, 1);
}

/**
Expand All @@ -239,7 +239,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumen
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
public BoltDeclarer setBolt(String id, IWindowedBolt bolt, int parallelism_hint) throws IllegalArgumentException {
return setBolt(id, new WindowedBoltExecutor(bolt), parallelism_hint);
}

Expand All @@ -258,7 +258,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hi
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
return setBolt(id, bolt, 1);
}

/**
Expand All @@ -276,7 +276,7 @@ public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt)
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException {
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, int parallelism_hint) throws IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(bolt), parallelism_hint);
}
Expand All @@ -294,7 +294,7 @@ public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt,
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
return setBolt(id, bolt, 1);
}

/**
Expand All @@ -310,7 +310,7 @@ public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelism_hint) throws IllegalArgumentException {
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, int parallelism_hint) throws IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(new StatefulWindowedBoltExecutor<T>(bolt)), parallelism_hint);
}
Expand All @@ -323,7 +323,7 @@ public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
return setSpout(id, spout, null);
return setSpout(id, spout, 1);
}

/**
Expand All @@ -336,18 +336,18 @@ public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumen
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
public SpoutDeclarer setSpout(String id, IRichSpout spout, int parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);
return new SpoutGetter(id);
}

public void setStateSpout(String id, IRichStateSpout stateSpout) throws IllegalArgumentException {
setStateSpout(id, stateSpout, null);
setStateSpout(id, stateSpout, 1);
}

public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) throws IllegalArgumentException {
public void setStateSpout(String id, IRichStateSpout stateSpout, int parallelism_hint) throws IllegalArgumentException {
validateUnusedId(id);
// TODO: finish
}
Expand Down Expand Up @@ -433,16 +433,13 @@ private ComponentCommon getComponentCommon(String id, IComponent component) {
return ret;
}

private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
private void initCommon(String id, IComponent component, int parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) {
int dop = parallelism.intValue();
if(dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
if(parallelism < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(parallelism);
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
Expand Down