From 37b5c1691a8f8264d822818e4b865ad70ac99f6e Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 11 Jun 2026 12:58:17 -0400 Subject: [PATCH 1/2] ref(workerchild): add produce future success/failure metrics --- .../src/taskbroker_client/worker/workerchild.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 97e0067e..669dc4d1 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -267,6 +267,12 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: # We don't care about the actual result value, # we just care if result() raises or not [f.result(RESULT_TIMEOUT_SEC) for f in task.pending_futures] + metrics.incr( + "taskworker.worker.produce.success", + tags={ + "processing_pool": processing_pool_name, + }, + ) # If any pending producer futures failed, retry the task except Exception: task.status = TASK_ACTIVATION_STATUS_FAILURE @@ -274,6 +280,12 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: retry_state = task.inflight.activation.retry_state if not task.task_func.retry.max_attempts_reached(retry_state): task.status = TASK_ACTIVATION_STATUS_RETRY + metrics.incr( + "taskworker.worker.produce.failure", + tags={ + "processing_pool": processing_pool_name, + }, + ) pending_task_futures.remove(task) _task_execution_complete( inflight=task.inflight, From 4c8d26d15acfcc072f7fe1af1c1adcf1b03115df Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 11 Jun 2026 13:20:38 -0400 Subject: [PATCH 2/2] emit single metric --- .../taskbroker_client/worker/workerchild.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 669dc4d1..9dd27c70 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -267,12 +267,7 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: # We don't care about the actual result value, # we just care if result() raises or not [f.result(RESULT_TIMEOUT_SEC) for f in task.pending_futures] - metrics.incr( - "taskworker.worker.produce.success", - tags={ - "processing_pool": processing_pool_name, - }, - ) + produce_status = "success" # If any pending producer futures failed, retry the task except Exception: task.status = TASK_ACTIVATION_STATUS_FAILURE @@ -280,12 +275,14 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: retry_state = task.inflight.activation.retry_state if not task.task_func.retry.max_attempts_reached(retry_state): task.status = TASK_ACTIVATION_STATUS_RETRY - metrics.incr( - "taskworker.worker.produce.failure", - tags={ - "processing_pool": processing_pool_name, - }, - ) + produce_status = "failure" + metrics.incr( + "taskworker.worker.produce.result", + tags={ + "status": produce_status, + "processing_pool": processing_pool_name, + }, + ) pending_task_futures.remove(task) _task_execution_complete( inflight=task.inflight,