Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -59,7 +57,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {

public static final String TOPIC = "topic";

private KafkaProducer<K, V> producer;
private static KafkaProducer producer;
private OutputCollector collector;
private TupleToKafkaMapper<K,V> mapper;
private KafkaTopicSelector topicSelector;
Expand Down Expand Up @@ -119,7 +117,12 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
}
}

producer = mkProducer(boltSpecifiedProperties);
synchronized (KafkaBolt.class) {
if (producer == null) {
producer = mkProducer(boltSpecifiedProperties);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are touching this code, can you please rename the method to mkProducer to newProducer or createProducer. I don't think mk is conventional and/or meaningful.

}
}

this.collector = collector;
}

Expand Down Expand Up @@ -186,7 +189,12 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

@Override
public void cleanup() {
producer.close();
synchronized (KafkaBolt.class) {
if (producer != null) {
producer.close();
producer = null;
}
}
}

/**
Expand Down