ref(worker): add more observability for producer futures#719
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit c6f52d0. Configure here.
| "processing_pool": processing_pool_name, | ||
| "taskbroker_host": taskbroker_host, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Kafka wait omitted from latency
Medium Severity
For activations that await producer futures, record_task_execution still derives execution_latency, COGS usage, and Sentry check-in duration from completion_time, which is now futures_start_time rather than wall-clock completion. Time spent waiting on Kafka futures is dropped from those values even though e2e_latency includes it.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit c6f52d0. Configure here.
There was a problem hiding this comment.
I'm not sure how we'd want to handle these - in theory tasks aren't "complete" until all produced futures are complete, but I still see task function execution time & future awaiting time as separate things we should track
Open to advice here
There was a problem hiding this comment.
it seems to me that what we're measuring here is the additional time it takes to move futures from one list to another, not waiting for the produce to actually finished. is that correct?
There was a problem hiding this comment.
execution_duration is how long the task function takes to execute, futures_duration is how long it took for all of a tasks producer futures to be marked as done. Since tasks that produced messages don't go into processed_tasks until all their futures are done, this e2e time metric tracks how long that took.
| inflight=task.inflight, | ||
| next_state=task.status, | ||
| execution_start_time=task.execution_start_time, | ||
| execution_end_time=task.futures_start_time, |
There was a problem hiding this comment.
looks like you're already passing in futures_start_time
There was a problem hiding this comment.
In this branch of the code, tasks have been sitting in pending_task_futures waiting for their producer futures to finish (so execution_end_time == futures_start_time, which is when the tasks were added to pending_task_futures).
Here is where we call _task_execution_complete() when a task had no pending futures, so a different value (the current time) is passed in as execution_end_time.
I think this module is due for a bit of a refactor to clean up the branching logic, but I'd rather do that in a separate PR.
| "processing_pool": processing_pool_name, | ||
| "taskbroker_host": taskbroker_host, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
it seems to me that what we're measuring here is the additional time it takes to move futures from one list to another, not waiting for the produce to actually finished. is that correct?
There was a problem hiding this comment.
approving to unblock
i think the histogram vs gauge thing might prevent you from getting insights. but if you feel like it's good enough for your purposes to see workerchild's metrics muddled together per-pool (and I think it'll be fine), then i'd rather unblock for now and revisit later (i can see this discussion dragging on for too long, especially since it concerns cardinality)
and the other thing about which latency is the right one: i also don't think we need to resolve this conversation in this PR
it's important to me that we get to a state where we can safely roll out producer futures in existing high-scale tasks (and also for tasks that we will create from legacy consumers), and for that we need the investigation to continue. otherwise legacy consumers project will soon get to a point where we can't port more consumers either. everything else is irrelevant right now.
c995b96 to
6941464
Compare


This PR adds more observability to TaskProducer (and the worker child) to help troubleshoot why some producer futures never got marked as done.
Specifically:
_pending_futuresfrom a single queue to a dict keyed by producer name, so we can break down worker child metrics by producere2emetric that records both of these put together