Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions clients/python/src/taskbroker_client/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import dataclasses
from collections.abc import MutableMapping
from concurrent.futures import Future
from typing import Any, Callable, Protocol

from arroyo.backends.abstract import ProducerFuture
Expand Down Expand Up @@ -42,6 +43,16 @@ def produce(
) -> ProducerFuture[BrokerValue[KafkaPayload]]: ...


class CloseableProducerProtocol(Protocol):

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a new type rather than updating ProducerProtocol because that's used by TaskbrokerApp's producer factory, which returns a SingletonProducer in sentry, which has a different name for its shutdown method.

"""Interface used by TaskProducer. Represents a producer that has a shutdown method."""

def produce(
self, dest: Topic | Partition, payload: KafkaPayload
) -> ProducerFuture[BrokerValue[KafkaPayload]]: ...

def close(self) -> Future[None]: ...


ProducerFactory = Callable[[str], ProducerProtocol]
"""
A factory interface for resolving topics into a KafkaProducer
Expand Down
14 changes: 10 additions & 4 deletions clients/python/src/taskbroker_client/worker/producer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
from collections import deque
from collections.abc import Callable
from concurrent.futures import Future
Expand All @@ -9,7 +10,7 @@

from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES
from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend
from taskbroker_client.types import ProducerProtocol
from taskbroker_client.types import CloseableProducerProtocol

# This is global as TaskWorker needs to be able to call TaskProducer.collect_futures()
# without a reference to a task's specific instance of TaskProducer.
Expand Down Expand Up @@ -39,17 +40,18 @@ class TaskProducer:
def __init__(
self,
name: str,
producer_factory: Callable[[], ProducerProtocol],
producer_factory: Callable[[], CloseableProducerProtocol],
metrics_backend: MetricsBackend | None = None,
) -> None:
self.name = name
self._producer_factory = producer_factory
self._inner_producer: ProducerProtocol | None = None
self._inner_producer: CloseableProducerProtocol | None = None
self.metrics = metrics_backend if metrics_backend is not None else NoOpMetricsBackend()

def _get(self) -> ProducerProtocol:
def _get(self) -> CloseableProducerProtocol:
if self._inner_producer is None:
self._inner_producer = self._producer_factory()
atexit.register(self._shutdown)
Comment thread
sentry[bot] marked this conversation as resolved.
return self._inner_producer

def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None:
Expand Down Expand Up @@ -99,3 +101,7 @@ def produce(
"or instantiate your producer with `use_simple_futures=False`."
)
)

def _shutdown(self) -> None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The call to self._inner_producer.close().result() in the _shutdown handler lacks a timeout, which can cause the process to hang indefinitely if the Kafka broker is unreachable.
Severity: HIGH

Suggested Fix

Add a timeout to the self._inner_producer.close().result() call within the _shutdown method. This will prevent the process from hanging indefinitely if the Kafka broker is unresponsive during shutdown. For example, use self._inner_producer.close().result(timeout=30).

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/taskbroker_client/worker/producer.py#L105

Potential issue: The `_shutdown` function, registered via `atexit`, calls
`self._inner_producer.close().result()` without a timeout. The underlying
`arroyo.KafkaProducer.close()` method flushes pending messages and blocks until they are
acknowledged by the Kafka broker. If the broker is unreachable or slow during process
shutdown (e.g., due to a network partition or rolling restart), the `.result()` call
will block indefinitely. Since this occurs within an `atexit` handler, the process will
never terminate cleanly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its unreachable we have bigger problems

if self._inner_producer is not None:
self._inner_producer.close().result()
5 changes: 5 additions & 0 deletions clients/python/tests/worker/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def produce(
future.set_result(make_broker_value())
return future

def close(self) -> Future[None]:
f: Future[None] = Future()
f.set_result(None)
return f


def get_dummy_producer(use_simple_futures: bool) -> DummyProducer:
return DummyProducer(use_simple_futures=use_simple_futures)
Expand Down
Loading