diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0a441e118fa..11dcd84474e 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1676,6 +1676,23 @@ async def test_update_latency(cleanup): assert w.digests["latency"].size() > 0 +@pytest.mark.asyncio +async def test_heartbeat_executing(cleanup): + async with await Scheduler() as s: + async with await Worker(s.address) as w: + async with Client(s.address, asynchronous=True) as c: + ws = s.workers[w.address] + # Initially there are no active tasks + assert not ws.metrics["executing"] + # Submit a task and ensure the worker's heartbeat includes the task + # in it's executing + f = c.submit(slowinc, 1, delay=1) + while not ws.metrics["executing"]: + await w.heartbeat() + assert f.key in ws.metrics["executing"] + await f + + @pytest.mark.asyncio @pytest.mark.parametrize("reconnect", [True, False]) async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect): diff --git a/distributed/worker.py b/distributed/worker.py index a682beadcd7..5e281967ed3 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -126,6 +126,10 @@ class TaskState: The number of times a dependency has not been where we expected it * **startstops**: ``[{startstop}]`` Log of transfer, load, and compute times for a task + * **start_time**: ``float`` + Time at which task begins running + * **stop_time**: ``float`` + Time at which task finishes running * **metadata**: ``dict`` Metadata related to task. Stored metadata should be msgpack serializable (e.g. int, string, list, dict). @@ -159,6 +163,8 @@ def __init__(self, key, runspec=None): self.type = None self.suspicious_count = 0 self.startstops = list() + self.start_time = None + self.stop_time = None self.metadata = {} def __repr__(self): @@ -767,8 +773,8 @@ def local_dir(self): return self.local_directory async def get_metrics(self): + now = time() core = dict( - executing=self.executing_count, in_memory=len(self.data), ready=len(self.ready), in_flight=self.in_flight_tasks, @@ -777,6 +783,10 @@ async def get_metrics(self): "workers": dict(self.bandwidth_workers), "types": keymap(typename, self.bandwidth_types), }, + executing={ + key: now - self.tasks[key].start_time + for key in self.active_threads.values() + }, ) custom = {} for k, metric in self.metrics.items(): @@ -2316,11 +2326,16 @@ def executor_submit(self, key, function, args=(), kwargs=None, executor=None): pc = PeriodicCallback( lambda: logger.debug("future state: %s - %s", key, future._state), 1000 ) + ts = self.tasks.get(key) + if ts is not None: + ts.start_time = time() pc.start() try: yield future finally: pc.stop() + if ts is not None: + ts.stop_time = time() result = future.result()