diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java index 30f97a0e16..ffa06b1eeb 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -20,10 +20,8 @@ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -59,7 +57,7 @@ public class KafkaBolt extends BaseTickTupleAwareRichBolt { public static final String TOPIC = "topic"; - private KafkaProducer producer; + private static KafkaProducer producer; private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; @@ -119,7 +117,12 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll } } - producer = mkProducer(boltSpecifiedProperties); + synchronized (KafkaBolt.class) { + if (producer == null) { + producer = mkProducer(boltSpecifiedProperties); + } + } + this.collector = collector; } @@ -186,7 +189,12 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { - producer.close(); + synchronized (KafkaBolt.class) { + if (producer != null) { + producer.close(); + producer = null; + } + } } /**