Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions clients/python/src/taskbroker_client/worker/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import atexit
from collections import deque
from collections import defaultdict, deque
from collections.abc import Callable
from concurrent.futures import Future
from typing import Any, Sequence
Expand All @@ -14,10 +14,10 @@

# This is global as TaskWorker needs to be able to call TaskProducer.collect_futures()
# without a reference to a task's specific instance of TaskProducer.
# Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures()
# is never called.
_pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque(
maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES
# Keys are the names of each `TaskProducer` instance in the current process, values are
# deques with a maxlen to prevent unbounded queue size if `collect_futures()` is never called.
_pending_futures: defaultdict[str, deque[ProducerFuture[BrokerValue[KafkaPayload]]]] = defaultdict(
lambda: deque(maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES)
)


Expand Down Expand Up @@ -55,18 +55,21 @@ def _get(self) -> CloseableProducerProtocol:
return self._inner_producer

def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None:
_pending_futures.append(future)
_pending_futures[self.name].append(future)
self.metrics.gauge(
"task.producer.pending.futures",
len(_pending_futures),
len(_pending_futures[self.name]),
tags={"producer_name": self.name},
)

@staticmethod
def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]:
futures = _pending_futures.copy()
def collect_futures() -> dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]]:
"""
Clears the `_pending_futures` dict, and returns a copy with all values converted to sets.
"""
pending_copy = _pending_futures.copy()
_pending_futures.clear()
return set(futures)
return {name: set(val) for name, val in pending_copy.items()}

def produce(
self,
Expand Down
85 changes: 73 additions & 12 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ActivationWithPendingFutures:
status: TaskActivationStatus.ValueType
execution_start_time: float
futures_start_time: float
pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]]
pending_futures: dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]]
task_func: Task[Any, Any]


Expand Down Expand Up @@ -263,35 +263,50 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None:
and submitted for retry (if the policy allows).
"""
RESULT_TIMEOUT_SEC = 1
error_name: str = "N/A"
try:
# We don't care about the actual result value,
# we just care if result() raises or not
[f.result(RESULT_TIMEOUT_SEC) for f in task.pending_futures]
[f.result(RESULT_TIMEOUT_SEC) for fut in task.pending_futures.values() for f in fut]
produce_status = "success"
# If any pending producer futures failed, retry the task
except Exception:
except Exception as e:
task.status = TASK_ACTIVATION_STATUS_FAILURE
if task.task_func.retry:
retry_state = task.inflight.activation.retry_state
if not task.task_func.retry.max_attempts_reached(retry_state):
task.status = TASK_ACTIVATION_STATUS_RETRY
produce_status = "failure"
error_name = type(e).__name__
metrics.incr(
"taskworker.worker.produce.result",
tags={
"status": produce_status,
"processing_pool": processing_pool_name,
"error_name": error_name,
},
)
pending_task_futures.remove(task)
_task_execution_complete(
inflight=task.inflight,
next_state=task.status,
execution_start_time=task.execution_start_time,
execution_end_time=task.futures_start_time,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you're already passing in futures_start_time

@bmckerry bmckerry Jun 18, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this branch of the code, tasks have been sitting in pending_task_futures waiting for their producer futures to finish (so execution_end_time == futures_start_time, which is when the tasks were added to pending_task_futures).

Here is where we call _task_execution_complete() when a task had no pending futures, so a different value (the current time) is passed in as execution_end_time.

I think this module is due for a bit of a refactor to clean up the branching logic, but I'd rather do that in a separate PR.

task_func=task.task_func,
futures_start_time=task.futures_start_time,
)

def get_oldest_pending_activation() -> ActivationWithPendingFutures | None:
oldest: ActivationWithPendingFutures | None = None
max_age = 0.0
now = time.time()
for task in pending_task_futures:
age = now - task.futures_start_time
if age > max_age:
max_age = age
oldest = task
return oldest

def check_task_future_completion() -> None:
if len(pending_task_futures) > 0:
# Records how many activations with pending producer futures
Expand All @@ -304,8 +319,31 @@ def check_task_future_completion() -> None:
},
)
for task in pending_task_futures.copy():
if all([f.done() for f in task.pending_futures]):
future_status = [f.done() for fut in task.pending_futures.values() for f in fut]
if all(future_status):
await_task_futures(task)
else:
# How many futures are still pending in this task
metrics.distribution(
"taskworker.task.incomplete_futures",
len([f for f in future_status if not f]),
tags={
"processing_pool": processing_pool_name,
"namespace": task.inflight.activation.namespace,
"taskname": task.inflight.activation.taskname,
},
)
# How long has the oldest pending task been sitting in the queue
if oldest := get_oldest_pending_activation():
metrics.distribution(
"taskworker.worker.oldest_pending_activation_age",
time.time() - oldest.futures_start_time,
tags={
"processing_pool": processing_pool_name,
"namespace": oldest.inflight.activation.namespace,
"taskname": oldest.inflight.activation.taskname,
},
)

while not shutdown_event.is_set() and not local_shutdown.is_set():
if max_task_count and processed_task_count >= max_task_count:
Expand Down Expand Up @@ -463,16 +501,30 @@ def check_task_future_completion() -> None:
# If the task function itself failed, we don't need to await any
# producer futures since it'll be retried anyways
if next_state != TASK_ACTIVATION_STATUS_COMPLETE:
task_produced_futures = set()
task_produced_futures = {}

if len(task_produced_futures) == 0:
_task_execution_complete(
inflight,
next_state,
execution_start_time,
time.time(),
task_func,
)
else:
for name, futures in task_produced_futures.items():
# How many futures were produced in the executed task,
# tagged by producer name
metrics.gauge(
"taskworker.task.futures_produced",
len(futures),
tags={
"producer_name": name,
"processing_pool": processing_pool_name,
"namespace": inflight.activation.namespace,
"taskname": inflight.activation.taskname,
},
)
pending_task = ActivationWithPendingFutures(
inflight=inflight,
status=next_state,
Expand Down Expand Up @@ -582,9 +634,7 @@ def record_task_execution(
task_added_time = activation.received_at.ToDatetime().timestamp()
execution_duration = completion_time - start_time
execution_latency = completion_time - task_added_time
futures_duration = (
completion_time - futures_enqueued_time if futures_enqueued_time else None
)
futures_duration = time.time() - futures_enqueued_time if futures_enqueued_time else 0

logger.debug(
"taskworker.task_execution",
Expand Down Expand Up @@ -625,7 +675,7 @@ def record_task_execution(
"taskbroker_host": taskbroker_host,
},
)
if futures_duration:
if futures_duration != 0:
metrics.distribution(
"taskworker.worker.future_completion_duration",
futures_duration,
Expand All @@ -636,6 +686,18 @@ def record_task_execution(
"taskbroker_host": taskbroker_host,
},
)
# Latency between task execution start and all producer futures completing
# (i.e. how long it took to put the task in the processed_tasks queue)
metrics.distribution(
"taskworker.worker.e2e_latency",
(execution_duration + futures_duration),
tags={
"namespace": activation.namespace,
"taskname": activation.taskname,
"processing_pool": processing_pool_name,
"taskbroker_host": taskbroker_host,
},
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka wait omitted from latency

Medium Severity

For activations that await producer futures, record_task_execution still derives execution_latency, COGS usage, and Sentry check-in duration from completion_time, which is now futures_start_time rather than wall-clock completion. Time spent waiting on Kafka futures is dropped from those values even though e2e_latency includes it.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c6f52d0. Configure here.

@bmckerry bmckerry Jun 17, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how we'd want to handle these - in theory tasks aren't "complete" until all produced futures are complete, but I still see task function execution time & future awaiting time as separate things we should track

Open to advice here

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that what we're measuring here is the additional time it takes to move futures from one list to another, not waiting for the produce to actually finished. is that correct?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execution_duration is how long the task function takes to execute, futures_duration is how long it took for all of a tasks producer futures to be marked as done. Since tasks that produced messages don't go into processed_tasks until all their futures are done, this e2e time metric tracks how long that took.


namespace = app.get_namespace(activation.namespace)
metrics.incr(
Expand Down Expand Up @@ -663,11 +725,10 @@ def _task_execution_complete(
inflight: InflightTaskActivation,
next_state: TaskActivationStatus.ValueType,
execution_start_time: float,
execution_end_time: float,
task_func: Task[Any, Any] | None,
futures_start_time: float | None = None,
) -> None:
# Get completion time before pushing to queue, so we can measure queue append time
execution_complete_time = time.time()
with metrics.timer(
"taskworker.worker.processed_tasks.put.duration",
tags={
Expand Down Expand Up @@ -709,7 +770,7 @@ def _task_execution_complete(
inflight.activation,
next_state,
execution_start_time,
execution_complete_time,
execution_end_time,
processing_pool_name,
inflight.host,
futures_start_time,
Expand Down
8 changes: 5 additions & 3 deletions clients/python/tests/worker/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def test_producer_tracks_futures() -> None:
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True))
producer.produce(Topic("test"), make_kafka_payload())
assert len(_pending_futures) == 1
future = next(iter(TaskProducer.collect_futures()))
collected = TaskProducer.collect_futures()
future = next(iter(collected["test.producer"]))
assert future.result() == make_broker_value()
assert len(_pending_futures) == 0

Expand All @@ -70,7 +71,8 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:
received.append(future)

producer.produce(Topic("test"), make_kafka_payload(), callbacks=[callback])
tracked_future = next(iter(TaskProducer.collect_futures()))
collected = TaskProducer.collect_futures()
tracked_future = next(iter(collected["test.producer"]))

assert len(received) == 1
assert received[0] is tracked_future
Expand All @@ -91,4 +93,4 @@ def test_pending_futures_max_len() -> None:
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True))
for _ in range(10001):
producer.produce(Topic("test"), make_kafka_payload())
assert len(_pending_futures) == 10000
assert len(_pending_futures["test.producer"]) == 10000
16 changes: 11 additions & 5 deletions clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ def test_child_process_tracks_producer_futures(

todo.put(task)
with mock.patch.object(
TaskProducer, "collect_futures", return_value={done_future}
TaskProducer, "collect_futures", return_value={"test.producer": {done_future}}
) as collect_mock:
child_process(
"examples.app:app",
Expand Down Expand Up @@ -1549,7 +1549,9 @@ def observe_and_resolve() -> None:
observer = threading.Thread(target=observe_and_resolve, name="future-observer")
observer.start()
try:
with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}):
with mock.patch.object(
TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}}
):
child_process(
"examples.app:app",
todo,
Expand Down Expand Up @@ -1593,7 +1595,9 @@ def deliver_sigterm() -> None:
sigterm_thread = threading.Thread(target=deliver_sigterm, name="sigterm-sender")
sigterm_thread.start()
try:
with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}):
with mock.patch.object(
TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}}
):
child_process(
"examples.app:app",
todo,
Expand Down Expand Up @@ -1639,7 +1643,9 @@ def test_child_process_retries_on_failed_future(
failed_future.set_exception(RuntimeError("kafka produce failed"))

todo.put(retriable_task)
with mock.patch.object(TaskProducer, "collect_futures", return_value={failed_future}):
with mock.patch.object(
TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}}
):
child_process(
"examples.app:app",
todo,
Expand All @@ -1660,7 +1666,7 @@ def test_child_process_clears_pending_futures_when_task_fails(
) -> None:
leftover_future: Future[BrokerValue[KafkaPayload]] = Future()
leftover_future.set_result(_make_broker_value())
_pending_futures.append(leftover_future)
_pending_futures["test.producer"].append(leftover_future)
assert len(_pending_futures) == 1

todo: queue.Queue[InflightTaskActivation] = queue.Queue()
Expand Down
Loading