We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 2ddf936 commit 65635fcCopy full SHA for 65635fc
stonesoup/writer/kafka.py
@@ -0,0 +1,23 @@
1
+import json
2
+from typing import Dict
3
+
4
+from confluent_kafka import Producer
5
+from stonesoup.base import Property
6
+from stonesoup.writer import Writer
7
8
9
+class KafkaWriter(Writer):
10
+ """Write data to a Kafka topic"""
11
+ kafka_config: Dict[str, str] = Property(
12
+ default={}, doc="Keyword arguments for the underlying kafka producer"
13
+ )
14
15
+ def __init__(self, *args, **kwargs):
16
+ super().__init__(*args, **kwargs)
17
+ self._producer = Producer(self.kafka_config)
18
19
+ def write(self, topic, data, flush=True):
20
+ as_json = json.dumps(data)
21
+ self._producer.produce(topic, as_json)
22
+ if flush:
23
+ self._producer.flush()
0 commit comments