As of now, we are pushing to Kafka via an infinite loop, which might cause problems in the future.
We should consider defining "number of tries" after which exception will be raised.
|
def produce(self, record): |
|
while True: |
|
try: |
|
self._producer.produce( |
|
topic=self._topic, |
|
key=record.key_to_avro_dict(), |
|
value=record.value_to_avro_dict(), |
|
on_delivery=self._delivery_report |
|
) |
|
self._producer.poll(0) |
|
break |
|
except BufferError as e: |
|
print( |
|
f'Failed to send on attempt {record}. ' |
|
f'Error received {str(e)}') |
|
self._producer.poll(1) |
As of now, we are pushing to Kafka via an infinite loop, which might cause problems in the future.
We should consider defining "number of tries" after which exception will be raised.
py2k/py2k/producer.py
Lines 24 to 39 in dfe8d5e