diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index eeb04d5c..97e0067e 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -284,16 +284,16 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: ) def check_task_future_completion() -> None: - # Records how many activations with pending producer futures - # the worker child has. - metrics.gauge( - "taskworker.worker.activations_with_pending_futures", - len(pending_task_futures), - tags={ - "processing_pool": processing_pool_name, - }, - ) if len(pending_task_futures) > 0: + # Records how many activations with pending producer futures + # the worker child has. Only records when there are pending activations. + metrics.gauge( + "taskworker.worker.activations_with_pending_futures", + len(pending_task_futures), + tags={ + "processing_pool": processing_pool_name, + }, + ) for task in pending_task_futures.copy(): if all([f.done() for f in task.pending_futures]): await_task_futures(task) @@ -310,13 +310,13 @@ def check_task_future_completion() -> None: break try: + check_task_future_completion() inflight = child_tasks.get(timeout=1.0) except queue.Empty: metrics.incr( "taskworker.worker.child_task_queue_empty", tags={"processing_pool": processing_pool_name}, ) - check_task_future_completion() continue task_func = _get_known_task(inflight.activation) @@ -474,8 +474,6 @@ def check_task_future_completion() -> None: ) pending_task_futures.append(pending_task) - check_task_future_completion() - # Once we get the shutdown signal, drain any pending futures for task in pending_task_futures.copy(): await_task_futures(task)