diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 92cad77685..2c8cde3a5a 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -164,7 +164,7 @@ public StormTopology createTopology() { * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException { - return setBolt(id, bolt, null); + return setBolt(id, bolt, 1); } /** @@ -176,7 +176,7 @@ public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentExc * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { + public BoltDeclarer setBolt(String id, IRichBolt bolt, int parallelism_hint) throws IllegalArgumentException { validateUnusedId(id); initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); @@ -195,7 +195,7 @@ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException { - return setBolt(id, bolt, null); + return setBolt(id, bolt, 1); } /** @@ -210,7 +210,7 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentEx * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) throws IllegalArgumentException { + public BoltDeclarer setBolt(String id, IBasicBolt bolt, int parallelism_hint) throws IllegalArgumentException { return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint); } @@ -225,7 +225,7 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException { - return setBolt(id, bolt, null); + return setBolt(id, bolt, 1); } /** @@ -239,7 +239,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumen * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException { + public BoltDeclarer setBolt(String id, IWindowedBolt bolt, int parallelism_hint) throws IllegalArgumentException { return setBolt(id, new WindowedBoltExecutor(bolt), parallelism_hint); } @@ -258,7 +258,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hi * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) throws IllegalArgumentException { - return setBolt(id, bolt, null); + return setBolt(id, bolt, 1); } /** @@ -276,7 +276,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public BoltDeclarer setBolt(String id, IStatefulBolt bolt, Number parallelism_hint) throws IllegalArgumentException { + public BoltDeclarer setBolt(String id, IStatefulBolt bolt, int parallelism_hint) throws IllegalArgumentException { hasStatefulBolt = true; return setBolt(id, new StatefulBoltExecutor(bolt), parallelism_hint); } @@ -294,7 +294,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt, * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt) throws IllegalArgumentException { - return setBolt(id, bolt, null); + return setBolt(id, bolt, 1); } /** @@ -310,7 +310,7 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException { + public BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt, int parallelism_hint) throws IllegalArgumentException { hasStatefulBolt = true; return setBolt(id, new StatefulBoltExecutor(new StatefulWindowedBoltExecutor(bolt)), parallelism_hint); } @@ -323,7 +323,7 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt()); - if(parallelism!=null) { - int dop = parallelism.intValue(); - if(dop < 1) { - throw new IllegalArgumentException("Parallelism must be positive."); - } - common.set_parallelism_hint(dop); + if(parallelism < 1) { + throw new IllegalArgumentException("Parallelism must be positive."); } + common.set_parallelism_hint(parallelism); Map conf = component.getComponentConfiguration(); if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common);