From c6f52d0846cce786e4a3ac6419a4304dd19a09f4 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 17 Jun 2026 15:20:44 -0400 Subject: [PATCH 1/2] ref(worker): add more observability for producer futures --- .../src/taskbroker_client/worker/producer.py | 23 ++--- .../taskbroker_client/worker/workerchild.py | 85 ++++++++++++++++--- clients/python/tests/worker/test_producer.py | 8 +- clients/python/tests/worker/test_worker.py | 16 ++-- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index 607263d9..09347a5c 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -1,5 +1,5 @@ import atexit -from collections import deque +from collections import defaultdict, deque from collections.abc import Callable from concurrent.futures import Future from typing import Any, Sequence @@ -14,10 +14,10 @@ # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() # without a reference to a task's specific instance of TaskProducer. -# Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures() -# is never called. -_pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque( - maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES +# Keys are the names of each `TaskProducer` instance in the current process, values are +# deques with a maxlen to prevent unbounded queue size if `collect_futures()` is never called. +_pending_futures: defaultdict[str, deque[ProducerFuture[BrokerValue[KafkaPayload]]]] = defaultdict( + lambda: deque(maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES) ) @@ -55,18 +55,21 @@ def _get(self) -> CloseableProducerProtocol: return self._inner_producer def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: - _pending_futures.append(future) + _pending_futures[self.name].append(future) self.metrics.gauge( "task.producer.pending.futures", - len(_pending_futures), + len(_pending_futures[self.name]), tags={"producer_name": self.name}, ) @staticmethod - def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: - futures = _pending_futures.copy() + def collect_futures() -> dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]]: + """ + Clears the `_pending_futures` dict, and returns a copy with all values converted to sets. + """ + pending_copy = _pending_futures.copy() _pending_futures.clear() - return set(futures) + return {name: set(val) for name, val in pending_copy.items()} def produce( self, diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 9dd27c70..7db4521a 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -66,7 +66,7 @@ class ActivationWithPendingFutures: status: TaskActivationStatus.ValueType execution_start_time: float futures_start_time: float - pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]] + pending_futures: dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]] task_func: Task[Any, Any] @@ -263,24 +263,27 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: and submitted for retry (if the policy allows). """ RESULT_TIMEOUT_SEC = 1 + error_name: str = "N/A" try: # We don't care about the actual result value, # we just care if result() raises or not - [f.result(RESULT_TIMEOUT_SEC) for f in task.pending_futures] + [f.result(RESULT_TIMEOUT_SEC) for fut in task.pending_futures.values() for f in fut] produce_status = "success" # If any pending producer futures failed, retry the task - except Exception: + except Exception as e: task.status = TASK_ACTIVATION_STATUS_FAILURE if task.task_func.retry: retry_state = task.inflight.activation.retry_state if not task.task_func.retry.max_attempts_reached(retry_state): task.status = TASK_ACTIVATION_STATUS_RETRY produce_status = "failure" + error_name = type(e).__name__ metrics.incr( "taskworker.worker.produce.result", tags={ "status": produce_status, "processing_pool": processing_pool_name, + "error_name": error_name, }, ) pending_task_futures.remove(task) @@ -288,10 +291,22 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: inflight=task.inflight, next_state=task.status, execution_start_time=task.execution_start_time, + execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, ) + def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: + oldest: ActivationWithPendingFutures | None = None + max_age = 0.0 + now = time.time() + for task in pending_task_futures: + age = now - task.futures_start_time + if age > max_age: + max_age = age + oldest = task + return oldest + def check_task_future_completion() -> None: if len(pending_task_futures) > 0: # Records how many activations with pending producer futures @@ -304,8 +319,31 @@ def check_task_future_completion() -> None: }, ) for task in pending_task_futures.copy(): - if all([f.done() for f in task.pending_futures]): + future_status = [f.done() for fut in task.pending_futures.values() for f in fut] + if all(future_status): await_task_futures(task) + else: + # How many futures are still pending in this task + metrics.gauge( + "taskworker.task.incomplete_futures", + len([f for f in future_status if not f]), + tags={ + "processing_pool": processing_pool_name, + "namespace": task.inflight.activation.namespace, + "taskname": task.inflight.activation.taskname, + }, + ) + # How long has the oldest pending task been sitting in the queue + if oldest := get_oldest_pending_activation(): + metrics.gauge( + "taskworker.worker.oldest_pending_activation_age", + time.time() - oldest.futures_start_time, + tags={ + "processing_pool": processing_pool_name, + "namespace": oldest.inflight.activation.namespace, + "taskname": oldest.inflight.activation.taskname, + }, + ) while not shutdown_event.is_set() and not local_shutdown.is_set(): if max_task_count and processed_task_count >= max_task_count: @@ -463,16 +501,30 @@ def check_task_future_completion() -> None: # If the task function itself failed, we don't need to await any # producer futures since it'll be retried anyways if next_state != TASK_ACTIVATION_STATUS_COMPLETE: - task_produced_futures = set() + task_produced_futures = {} if len(task_produced_futures) == 0: _task_execution_complete( inflight, next_state, execution_start_time, + time.time(), task_func, ) else: + for name, futures in task_produced_futures.items(): + # How many futures were produced in the executed task, + # tagged by producer name + metrics.gauge( + "taskworker.task.futures_produced", + len(futures), + tags={ + "producer_name": name, + "processing_pool": processing_pool_name, + "namespace": inflight.activation.namespace, + "taskname": inflight.activation.taskname, + }, + ) pending_task = ActivationWithPendingFutures( inflight=inflight, status=next_state, @@ -582,9 +634,7 @@ def record_task_execution( task_added_time = activation.received_at.ToDatetime().timestamp() execution_duration = completion_time - start_time execution_latency = completion_time - task_added_time - futures_duration = ( - completion_time - futures_enqueued_time if futures_enqueued_time else None - ) + futures_duration = time.time() - futures_enqueued_time if futures_enqueued_time else 0 logger.debug( "taskworker.task_execution", @@ -625,7 +675,7 @@ def record_task_execution( "taskbroker_host": taskbroker_host, }, ) - if futures_duration: + if futures_duration != 0: metrics.distribution( "taskworker.worker.future_completion_duration", futures_duration, @@ -636,6 +686,18 @@ def record_task_execution( "taskbroker_host": taskbroker_host, }, ) + # Latency between task execution start and all producer futures completing + # (i.e. how long it took to put the task in the processed_tasks queue) + metrics.distribution( + "taskworker.worker.e2e_latency", + (execution_duration + futures_duration), + tags={ + "namespace": activation.namespace, + "taskname": activation.taskname, + "processing_pool": processing_pool_name, + "taskbroker_host": taskbroker_host, + }, + ) namespace = app.get_namespace(activation.namespace) metrics.incr( @@ -663,11 +725,10 @@ def _task_execution_complete( inflight: InflightTaskActivation, next_state: TaskActivationStatus.ValueType, execution_start_time: float, + execution_end_time: float, task_func: Task[Any, Any] | None, futures_start_time: float | None = None, ) -> None: - # Get completion time before pushing to queue, so we can measure queue append time - execution_complete_time = time.time() with metrics.timer( "taskworker.worker.processed_tasks.put.duration", tags={ @@ -709,7 +770,7 @@ def _task_execution_complete( inflight.activation, next_state, execution_start_time, - execution_complete_time, + execution_end_time, processing_pool_name, inflight.host, futures_start_time, diff --git a/clients/python/tests/worker/test_producer.py b/clients/python/tests/worker/test_producer.py index e8eafd4b..afb6e52c 100644 --- a/clients/python/tests/worker/test_producer.py +++ b/clients/python/tests/worker/test_producer.py @@ -57,7 +57,8 @@ def test_producer_tracks_futures() -> None: producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 1 - future = next(iter(TaskProducer.collect_futures())) + collected = TaskProducer.collect_futures() + future = next(iter(collected["test.producer"])) assert future.result() == make_broker_value() assert len(_pending_futures) == 0 @@ -70,7 +71,8 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: received.append(future) producer.produce(Topic("test"), make_kafka_payload(), callbacks=[callback]) - tracked_future = next(iter(TaskProducer.collect_futures())) + collected = TaskProducer.collect_futures() + tracked_future = next(iter(collected["test.producer"])) assert len(received) == 1 assert received[0] is tracked_future @@ -91,4 +93,4 @@ def test_pending_futures_max_len() -> None: producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) for _ in range(10001): producer.produce(Topic("test"), make_kafka_payload()) - assert len(_pending_futures) == 10000 + assert len(_pending_futures["test.producer"]) == 10000 diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 2ce3130a..eaecf269 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1503,7 +1503,7 @@ def test_child_process_tracks_producer_futures( todo.put(task) with mock.patch.object( - TaskProducer, "collect_futures", return_value={done_future} + TaskProducer, "collect_futures", return_value={"test.producer": {done_future}} ) as collect_mock: child_process( "examples.app:app", @@ -1549,7 +1549,9 @@ def observe_and_resolve() -> None: observer = threading.Thread(target=observe_and_resolve, name="future-observer") observer.start() try: - with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): child_process( "examples.app:app", todo, @@ -1593,7 +1595,9 @@ def deliver_sigterm() -> None: sigterm_thread = threading.Thread(target=deliver_sigterm, name="sigterm-sender") sigterm_thread.start() try: - with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): child_process( "examples.app:app", todo, @@ -1639,7 +1643,9 @@ def test_child_process_retries_on_failed_future( failed_future.set_exception(RuntimeError("kafka produce failed")) todo.put(retriable_task) - with mock.patch.object(TaskProducer, "collect_futures", return_value={failed_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} + ): child_process( "examples.app:app", todo, @@ -1660,7 +1666,7 @@ def test_child_process_clears_pending_futures_when_task_fails( ) -> None: leftover_future: Future[BrokerValue[KafkaPayload]] = Future() leftover_future.set_result(_make_broker_value()) - _pending_futures.append(leftover_future) + _pending_futures["test.producer"].append(leftover_future) assert len(_pending_futures) == 1 todo: queue.Queue[InflightTaskActivation] = queue.Queue() From 69414642f62dfc0620957af7ff7ded5c1cb70c11 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 18 Jun 2026 11:14:49 -0400 Subject: [PATCH 2/2] dist over gauge --- clients/python/src/taskbroker_client/worker/workerchild.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 7db4521a..7d23669f 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -324,7 +324,7 @@ def check_task_future_completion() -> None: await_task_futures(task) else: # How many futures are still pending in this task - metrics.gauge( + metrics.distribution( "taskworker.task.incomplete_futures", len([f for f in future_status if not f]), tags={ @@ -335,7 +335,7 @@ def check_task_future_completion() -> None: ) # How long has the oldest pending task been sitting in the queue if oldest := get_oldest_pending_activation(): - metrics.gauge( + metrics.distribution( "taskworker.worker.oldest_pending_activation_age", time.time() - oldest.futures_start_time, tags={