fix(TaskProducer): call close() on shutdown#718
Conversation
| ) -> ProducerFuture[BrokerValue[KafkaPayload]]: ... | ||
|
|
||
|
|
||
| class CloseableProducerProtocol(Protocol): |
There was a problem hiding this comment.
I made a new type rather than updating ProducerProtocol because that's used by TaskbrokerApp's producer factory, which returns a SingletonProducer in sentry, which has a different name for its shutdown method.
dff0895 to
bc90e93
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit bc90e93. Configure here.
bc90e93 to
aa29295
Compare
| ) | ||
| ) | ||
|
|
||
| def _shutdown(self) -> None: |
There was a problem hiding this comment.
Bug: The call to self._inner_producer.close().result() in the _shutdown handler lacks a timeout, which can cause the process to hang indefinitely if the Kafka broker is unreachable.
Severity: HIGH
Suggested Fix
Add a timeout to the self._inner_producer.close().result() call within the _shutdown method. This will prevent the process from hanging indefinitely if the Kafka broker is unresponsive during shutdown. For example, use self._inner_producer.close().result(timeout=30).
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: clients/python/src/taskbroker_client/worker/producer.py#L105
Potential issue: The `_shutdown` function, registered via `atexit`, calls
`self._inner_producer.close().result()` without a timeout. The underlying
`arroyo.KafkaProducer.close()` method flushes pending messages and blocks until they are
acknowledged by the Kafka broker. If the broker is unreachable or slow during process
shutdown (e.g., due to a network partition or rolling restart), the `.result()` call
will block indefinitely. Since this occurs within an `atexit` handler, the process will
never terminate cleanly.
There was a problem hiding this comment.
if its unreachable we have bigger problems

This PR adds a new
CloseableProducerProtocolinterface for TaskProducer.Previously, TaskProducer was never calling the arroyo producer close() method, meaning the inner Confluent producer would never
flush().