From de899fbba68b29be2de1b15b6b7d478e5e216396 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:06:36 -0400 Subject: [PATCH 1/3] Only use opened queue slots on running workers --- distributed/scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cf240240cfb..79c638d3e5a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7878,7 +7878,11 @@ def _exit_processing_common( state.release_resources(ts, ws) # If a slot has opened up for a queued task, schedule it. - if state.queued and not _worker_full(ws, state.WORKER_SATURATION): + if ( + state.queued + and ws.status == Status.running + and not _worker_full(ws, state.WORKER_SATURATION) + ): qts = state.queued.peek() if state.validate: assert qts.state == "queued", qts.state From 3b9d1272922ef1804ce422d99531035ef4555f96 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 11:36:37 -0600 Subject: [PATCH 2/3] add test --- distributed/tests/test_scheduler.py | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 7df6796066c..de2e512d2c3 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -408,6 +408,42 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 1)], + config={ + "distributed.scheduler.worker-saturation": 1.0, + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + }, +) +async def test_queued_dont_try_non_running_worker(c, s, a): + "When a slot opens on a non-running worker, don't consider scheduling a queued task" + events = [Event() for _ in range(5)] + fs = c.map(lambda ev: ev.wait(), events, key=[f"w-{i}" for i in range(len(events))]) + + await async_wait_for(lambda: s.queued, timeout=5) + + a.status = Status.paused + + await async_wait_for(lambda: not s.running, timeout=5) + + assert len(a.state.executing) == 1 + a_key: str = next(iter(a.state.executing)).key + a_task = s.tasks[a_key] + a_event = events[int(a_key[2])] + + front_of_queue = s.queued.peek() + + assert a_task.state == "processing" + await a_event.set() + await wait_for_state(a_key, "memory", s) + + story = s.story(front_of_queue) + assert story[-1][1:2] != ["queued", "queued"], story + + @pytest.mark.parametrize( "saturation_config, expected_task_counts", [ From 29cf61583269925566c5ce8236b163680875f63e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 28 Oct 2022 12:06:18 -0600 Subject: [PATCH 3/3] driveby: remove extraneous validation --- distributed/scheduler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 79c638d3e5a..43f68b69d47 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2167,10 +2167,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: _task_slots_available(ws, self.WORKER_SATURATION), ) assert ws in self.running, (ws, self.running) - - if self.validate and ws is not None: assert self.workers.get(ws.address) is ws - assert ws in self.running, (ws, self.running) return ws