From 79e8b9ac72a02f13d9c026ef1216ac4e723d960c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 27 Sep 2022 11:07:36 -0600 Subject: [PATCH 01/24] Scheduler: process new recommendations immediately Testing this change in CI to see what fails. Subtle but significant: _all_ recommendations from a transition will be processed on the next cycle. Previously, only recommendations for the current key, or for keys with no recommendations yet, would be processed on the next cycle. If a key already had a recommendation, the recommendation would be updated, but still processed in the original (priority) order. --- distributed/scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c5f4c0fba6b..d20b1b76611 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1705,6 +1705,10 @@ def _transitions( new = self._transition(key, finish, stimulus_id) new_recs, new_cmsgs, new_wmsgs = new + # Put recommendations at end of dict, so they're processed in the next cycle + for k in new_recs: + if k != key: + recommendations.pop(k, None) recommendations.update(new_recs) for c, new_msgs in new_cmsgs.items(): msgs = client_msgs.get(c) From 5e8e003f91daf217018bde07b60b44778ba4d5ab Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 27 Sep 2022 12:57:19 -0600 Subject: [PATCH 02/24] `update_msgs` helper --- distributed/scheduler.py | 51 ++++++++++++---------------------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d20b1b76611..9aecb9ba826 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1600,32 +1600,12 @@ def _transition( b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id) recommendations.update(a_recs) - for c, new_msgs in a_cmsgs.items(): - msgs = client_msgs.get(c) - if msgs is not None: - msgs.extend(new_msgs) - else: - client_msgs[c] = new_msgs - for w, new_msgs in a_wmsgs.items(): - msgs = worker_msgs.get(w) - if msgs is not None: - msgs.extend(new_msgs) - else: - worker_msgs[w] = new_msgs + update_msgs(client_msgs, a_cmsgs) + update_msgs(worker_msgs, a_wmsgs) recommendations.update(b_recs) - for c, new_msgs in b_cmsgs.items(): - msgs = client_msgs.get(c) - if msgs is not None: - msgs.extend(new_msgs) - else: - client_msgs[c] = new_msgs - for w, new_msgs in b_wmsgs.items(): - msgs = worker_msgs.get(w) - if msgs is not None: - msgs.extend(new_msgs) - else: - worker_msgs[w] = new_msgs + update_msgs(client_msgs, b_cmsgs) + update_msgs(worker_msgs, b_wmsgs) start = "released" else: @@ -1710,18 +1690,8 @@ def _transitions( if k != key: recommendations.pop(k, None) recommendations.update(new_recs) - for c, new_msgs in new_cmsgs.items(): - msgs = client_msgs.get(c) - if msgs is not None: - msgs.extend(new_msgs) - else: - client_msgs[c] = new_msgs - for w, new_msgs in new_wmsgs.items(): - msgs = worker_msgs.get(w) - if msgs is not None: - msgs.extend(new_msgs) - else: - worker_msgs[w] = new_msgs + update_msgs(client_msgs, new_cmsgs) + update_msgs(worker_msgs, new_wmsgs) if self.validate: # FIXME downcast antipattern @@ -8209,6 +8179,15 @@ def _worker_full(ws: WorkerState, saturation_factor: float) -> bool: return _task_slots_available(ws, saturation_factor) <= 0 +def update_msgs(msgs: dict[str, list[Any]], new: dict[str, list[Any]]) -> None: + for k, new_msgs in new.items(): + m = msgs.get(k) + if m is not None: + m.extend(new_msgs) + else: + msgs[k] = new_msgs + + class KilledWorker(Exception): def __init__(self, task: str, last_worker: WorkerState, allowed_failures: int): super().__init__(task, last_worker, allowed_failures) From 06ba0157af57e35134285e28b308058226ed3a10 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 27 Sep 2022 13:01:25 -0600 Subject: [PATCH 03/24] first hacky pass at structural co-assignment the linear chain traversal of the family metric isn't good. more broadly, we need to think about what states we expect tasks in families to be in. almost certainly, family should be a data structure that's maybe populated top-down in `update_graph`. the only reason we're able to get away with it not being a structure right now is because we assume that all tasks in a family are either not scheduled, or all in processing. there are graph structures where that's probably impossible. --- distributed/scheduler.py | 88 ++++++++++++++++++++++++++++++++++------ 1 file changed, 76 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9aecb9ba826..3aa531ee4d8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1912,20 +1912,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # All workers busy? Task gets/stays queued. return None - # Just pick the least busy worker. - # NOTE: this will lead to worst-case scheduling with regards to co-assignment. + # Pick the least busy worker. ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) if self.validate: + assert self.workers.get(ws.address) is ws assert not _worker_full(ws, self.WORKER_SATURATION), ( ws, _task_slots_available(ws, self.WORKER_SATURATION), ) assert ws in self.running, (ws, self.running) - if self.validate and ws is not None: - assert self.workers.get(ws.address) is ws - assert ws in self.running, (ws, self.running) - return ws def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: @@ -2016,6 +2012,7 @@ def transition_waiting_processing(self, key, stimulus_id): else: if not (ws := self.decide_worker_rootish_queuing_enabled()): return {ts.key: "queued"}, {}, {} + return _queueable_to_processing(self, ts, ws) else: if not (ws := self.decide_worker_non_rootish(ts)): return {ts.key: "no-worker"}, {}, {} @@ -2634,9 +2631,6 @@ def transition_queued_released(self, key, stimulus_id): def transition_queued_processing(self, key, stimulus_id): try: ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} if self.validate: assert not ts.actor, f"Actors can't be queued: {ts}" @@ -2644,10 +2638,10 @@ def transition_queued_processing(self, key, stimulus_id): if ws := self.decide_worker_rootish_queuing_enabled(): self.queued.discard(ts) - worker_msgs = _add_to_processing(self, ts, ws) - # If no worker, task just stays `queued` + return _queueable_to_processing(self, ts, ws) - return recommendations, client_msgs, worker_msgs + # If no worker, task just stays `queued` + return {}, {}, {} except Exception as e: logger.exception(e) if LOG_PDB: @@ -7682,6 +7676,45 @@ def _validate_ready(state: SchedulerState, ts: TaskState) -> None: assert all(dts.who_has for dts in ts.dependencies) +def _queueable_to_processing( + state: SchedulerState, ts: TaskState, ws: WorkerState +) -> tuple[Recs, dict[str, list[Any]], dict[str, list[Any]]]: + "Common logic for transitioning a queueable (root-ish) task to processing, along with the rest of its family" + recommendations: Recs = {} + worker_msgs = _add_to_processing(state, ts, ws) + + fam = family(ts, 20) # TODO maxsize as config/what?! + if fam: + # Schedule other tasks that will all need to be in memory + # with this task on the same worker at once. + + # TODO what about when the first task in a family that's already on a worker completes, + # but not the whole family? We want to wait for the whole family to be done before assigning + # another one. We don't want to take a slot that could be used for the downstream task in the future. + + # If workers could be responsible with memory, this would be okay. Because if everything in the previous + # family is done execpt one input to the downstream, then yes, we might as well get started on a new family + # while we're waiting, as long as we have the memory capacity to do so. + for fts in sorted(fam, key=operator.attrgetter("priority")): + # FIXME these tasks may not all necessarily be runnable. + # Or maybe some are even in memory? + # When does this happen, and how should we handle them? + if fts is ts: + continue + + if fts.state == "queued": + if state.validate: + assert fts in state.queued + state.queued.discard(fts) + update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) + + # This recommendation will be a no-op. It's just to remove any existing + # recommendation for the key from the recommendations queue. + recommendations[fts.key] = "processing" + + return recommendations, {}, worker_msgs + + def _add_to_processing( state: SchedulerState, ts: TaskState, ws: WorkerState ) -> dict[str, list]: @@ -8179,6 +8212,37 @@ def _worker_full(ws: WorkerState, saturation_factor: float) -> bool: return _task_slots_available(ws, saturation_factor) <= 0 +def family(ts: TaskState, maxsize: int) -> set[TaskState] | None: + if len(ts.dependents) > maxsize: + return None + family = set() + for child in ts.dependents: # TODO even support multiple dependents? + # Traverse linear chains + # TODO something more efficient + while len(deps := child.dependencies) == 1 and len(cd := child.dependents) == 1: + child = next(iter(cd)) + + if len(deps) == 1: + # Faster path: nothing but linear chains + continue + + if len(deps) < maxsize: + # Traverse _back_ to root tasks + # FIXME totally wrong, just a hack only for the test case + for cts in deps: + while len(cts.dependencies) == 1: + ncts = next(iter(cts.dependencies)) + if len(ncts.dependents) == 1: + cts = ncts + else: + break + + family.add(cts) + if len(family) > maxsize: + return None + return family + + def update_msgs(msgs: dict[str, list[Any]], new: dict[str, list[Any]]) -> None: for k, new_msgs in new.items(): m = msgs.get(k) From 0fa13c428243e1f2fcba2cc64f0d5c8b02ad2363 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 27 Sep 2022 17:07:52 -0600 Subject: [PATCH 04/24] wip --- distributed/scheduler.py | 26 ++++++++++++++------------ distributed/tests/test_scheduler.py | 23 ++++++++++++++++++++--- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3aa531ee4d8..9fdcb4e6de5 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8219,25 +8219,27 @@ def family(ts: TaskState, maxsize: int) -> set[TaskState] | None: for child in ts.dependents: # TODO even support multiple dependents? # Traverse linear chains # TODO something more efficient - while len(deps := child.dependencies) == 1 and len(cd := child.dependents) == 1: - child = next(iter(cd)) + while len(siblings := child.dependencies) == 1 and len(gc := child.dependents) == 1: + child = next(iter(gc)) - if len(deps) == 1: + if len(siblings) == 1: # Faster path: nothing but linear chains continue - if len(deps) < maxsize: - # Traverse _back_ to root tasks - # FIXME totally wrong, just a hack only for the test case - for cts in deps: - while len(cts.dependencies) == 1: - ncts = next(iter(cts.dependencies)) - if len(ncts.dependents) == 1: - cts = ncts + if len(siblings) < maxsize: + for sts in siblings: + if sts is ts: + continue + # Traverse linear chains _back_ to root tasks + # FIXME totally wrong, just a hack only for the test case + while len(sts.dependencies) == 1: + nsts = next(iter(sts.dependencies)) + if len(nsts.dependents) == 1: + sts = nsts else: break - family.add(cts) + family.add(sts) if len(family) > maxsize: return None return family diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 1df0ceb1619..c540965fa54 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -40,7 +40,7 @@ from distributed.core import ConnectionPool, Status, clean_exception, connect, rpc from distributed.metrics import time from distributed.protocol.pickle import dumps, loads -from distributed.scheduler import KilledWorker, MemoryState, Scheduler, WorkerState +from distributed.scheduler import KilledWorker, MemoryState, Scheduler, WorkerState, family from distributed.utils import TimeoutError from distributed.utils_test import ( BrokenComm, @@ -150,7 +150,8 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): nthreads=nthreads, config={ "distributed.scheduler.work-stealing": False, - "distributed.scheduler.worker-saturation": float("inf"), + # "distributed.scheduler.worker-saturation": float("inf"), + "distributed.scheduler.worker-saturation": 1.0, }, ) async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers): @@ -200,6 +201,7 @@ def random(**kwargs): **trivial_deps, ) + # dask.visualize(x, x.sum(axis=1, split_every=20), optimize_graph=True) xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) await xsum @@ -251,7 +253,22 @@ def random(**kwargs): test_decide_worker_coschedule_order_neighbors_() -@pytest.mark.slow +@gen_cluster(nthreads=[], client=True) +async def test_family(c, s): + ax = [delayed(i, name=f"a-{i}") for i in range(8)] + bx = [delayed(i, name=f"b-{i}") for i in range(8)] + cx = [delayed(i, name=f"c-{i}") for i in range(8)] + + zs = [a + b + c for a, b, c in zip(ax, bx, cx)] + + fs = c.compute(zs) + await async_wait_for(s.tasks, 5) + + a1 = s.tasks['a-1'] + + a1_fam = family(a1, 1000) + + @gen_cluster( nthreads=[("", 2)] * 4, client=True, From c0b169fccddd2886a49c8780d80010ad6ceb613e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 27 Sep 2022 22:57:32 -0600 Subject: [PATCH 05/24] some madness taking structuralism too far? --- distributed/scheduler.py | 159 ++++++++++++++++++++++++++------------- 1 file changed, 106 insertions(+), 53 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9fdcb4e6de5..072717ceec7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -17,7 +17,7 @@ import uuid import warnings import weakref -from collections import defaultdict, deque +from collections import Counter, defaultdict, deque from collections.abc import ( Callable, Collection, @@ -32,7 +32,7 @@ from contextlib import suppress from functools import partial from numbers import Number -from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast, overload +from typing import TYPE_CHECKING, Any, ClassVar, Literal, NoReturn, cast, overload import psutil from sortedcontainers import SortedDict, SortedSet @@ -961,6 +961,8 @@ class TaskState: #: further in :doc:`Scheduling Policy `. priority: tuple[int, ...] + planned_on: WorkerState | None + # Attribute underlying the state property _state: TaskStateState @@ -1153,6 +1155,7 @@ def __init__(self, key: str, run_spec: object, state: TaskStateState): self.waiting_on = set() self.waiters = set() self.who_has = set() + self.planned_on = None self.processing_on = None self.has_lost_dependencies = False self.host_restrictions = None # type: ignore @@ -1924,6 +1927,37 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: return ws + def decide_worker_from_family( + self, family: tuple[set[TaskState], set[TaskState]] | None + ) -> WorkerState | None: + if family is not None: + siblings, children = family + # First see if any downstream tasks are already planned somewhere; use that + candidates: Counter[WorkerState] = Counter() + for cts in children: + if pws := cts.planned_on: + if self.workers.get(pws.address) is pws: + candidates.update((pws,)) + else: + cts.planned_on = None + + if candidates: + return candidates.most_common(1)[0][0] + + # If not, see if any sibling tasks are already planned somewhere; use that + for cts in siblings: + if pws := cts.planned_on: + if self.workers.get(pws.address) is pws: + candidates.update((pws,)) + else: + cts.planned_on = None + + if candidates: + return candidates.most_common(1)[0][0] + + # No family, or nothing is planned anywhere. Pick a new worker. + return self.decide_worker_rootish_queuing_enabled() + def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: """Pick a worker for a runnable non-root task, considering dependencies and restrictions. @@ -2010,9 +2044,7 @@ def transition_waiting_processing(self, key, stimulus_id): if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): return {ts.key: "no-worker"}, {}, {} else: - if not (ws := self.decide_worker_rootish_queuing_enabled()): - return {ts.key: "queued"}, {}, {} - return _queueable_to_processing(self, ts, ws) + return _queueable_to_processing(self, ts) else: if not (ws := self.decide_worker_non_rootish(ts)): return {ts.key: "no-worker"}, {}, {} @@ -2636,12 +2668,7 @@ def transition_queued_processing(self, key, stimulus_id): assert not ts.actor, f"Actors can't be queued: {ts}" assert ts in self.queued - if ws := self.decide_worker_rootish_queuing_enabled(): - self.queued.discard(ts) - return _queueable_to_processing(self, ts, ws) - - # If no worker, task just stays `queued` - return {}, {}, {} + return _queueable_to_processing(self, ts) except Exception as e: logger.exception(e) if LOG_PDB: @@ -7677,40 +7704,62 @@ def _validate_ready(state: SchedulerState, ts: TaskState) -> None: def _queueable_to_processing( - state: SchedulerState, ts: TaskState, ws: WorkerState + state: SchedulerState, ts: TaskState ) -> tuple[Recs, dict[str, list[Any]], dict[str, list[Any]]]: "Common logic for transitioning a queueable (root-ish) task to processing, along with the rest of its family" recommendations: Recs = {} - worker_msgs = _add_to_processing(state, ts, ws) + worker_msgs: dict[str, list[Any]] = {} fam = family(ts, 20) # TODO maxsize as config/what?! - if fam: - # Schedule other tasks that will all need to be in memory - # with this task on the same worker at once. - - # TODO what about when the first task in a family that's already on a worker completes, - # but not the whole family? We want to wait for the whole family to be done before assigning - # another one. We don't want to take a slot that could be used for the downstream task in the future. - - # If workers could be responsible with memory, this would be okay. Because if everything in the previous - # family is done execpt one input to the downstream, then yes, we might as well get started on a new family - # while we're waiting, as long as we have the memory capacity to do so. - for fts in sorted(fam, key=operator.attrgetter("priority")): - # FIXME these tasks may not all necessarily be runnable. - # Or maybe some are even in memory? - # When does this happen, and how should we handle them? - if fts is ts: - continue + if ws := state.decide_worker_from_family(fam): + worker_msgs = _add_to_processing(state, ts, ws) + if fam: + siblings, children = fam + + # Schedule other tasks that will all need to be in memory + # with this task on the same worker at once. + + # TODO what about when the first task in a family that's already on a worker completes, + # but not the whole family? We want to wait for the whole family to be done before assigning + # another one. We don't want to take a slot that could be used for the downstream task in the future. + + # If workers could be responsible with memory, this would be okay. Because if everything in the previous + # family is done execpt one input to the downstream, then yes, we might as well get started on a new family + # while we're waiting, as long as we have the memory capacity to do so. + for fts in sorted(siblings, key=operator.attrgetter("priority")): + # FIXME these tasks may not all necessarily be runnable. + # Or maybe some are even in memory? + # When does this happen, and how should we handle them? + assert fts is not ts + assert fts.planned_on is None or fts.planned_on is ws, ( + fts.planned_on, + ws, + ) + + if fts.state == "processing": + assert fts.processing_on is ws, (fts.processing_on, ws) + + if fts.state == "memory": + assert ws in fts.who_has, (fts.who_has, ws) + + if fts.state == "queued": + if state.validate: + assert fts in state.queued + state.queued.discard(fts) + + if fts.state in ("released", "waiting", "queued"): + fts.planned_on = ws + + if not fts.waiting_on: # wtf if it's released?? + update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) - if fts.state == "queued": - if state.validate: - assert fts in state.queued - state.queued.discard(fts) - update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) + # This recommendation will be a no-op. It's just to remove any existing + # recommendation for the key from the recommendations queue. + recommendations[fts.key] = "processing" - # This recommendation will be a no-op. It's just to remove any existing - # recommendation for the key from the recommendations queue. - recommendations[fts.key] = "processing" + for cts in children: + if cts.planned_on is None: + cts.planned_on = ws return recommendations, {}, worker_msgs @@ -7726,6 +7775,7 @@ def _add_to_processing( assert (o := state.workers.get(ws.address)) is ws, (ws, o) state._set_duration_estimate(ts, ws) + ts.planned_on = None ts.processing_on = ws ts.state = "processing" state.acquire_resources(ts, ws) @@ -8212,37 +8262,40 @@ def _worker_full(ws: WorkerState, saturation_factor: float) -> bool: return _task_slots_available(ws, saturation_factor) <= 0 -def family(ts: TaskState, maxsize: int) -> set[TaskState] | None: +def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] | None: if len(ts.dependents) > maxsize: return None - family = set() + + siblings: set[TaskState] = set() + children: set[TaskState] = set() for child in ts.dependents: # TODO even support multiple dependents? # Traverse linear chains - # TODO something more efficient - while len(siblings := child.dependencies) == 1 and len(gc := child.dependents) == 1: + while len(sibs := child.dependencies) == 1 and len(gc := child.dependents) == 1: child = next(iter(gc)) - if len(siblings) == 1: - # Faster path: nothing but linear chains + children.add(child) + if len(sibs) == 1: + # Faster path: nothing but linear chains, so no siblings besides `ts` continue - if len(siblings) < maxsize: - for sts in siblings: - if sts is ts: - continue + if len(sibs) < maxsize: + for sts in sibs: # Traverse linear chains _back_ to root tasks - # FIXME totally wrong, just a hack only for the test case - while len(sts.dependencies) == 1: + # FIXME probably wrong + while sts is not ts and len(sts.dependencies) == 1: nsts = next(iter(sts.dependencies)) if len(nsts.dependents) == 1: sts = nsts else: break - family.add(sts) - if len(family) > maxsize: + if sts is ts: + continue + + siblings.add(sts) + if len(siblings) > maxsize: return None - return family + return siblings, children def update_msgs(msgs: dict[str, list[Any]], new: dict[str, list[Any]]) -> None: From 7d499a6db2454a8068bdd7aaeb008b75d6981179 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 28 Sep 2022 16:03:12 -0600 Subject: [PATCH 06/24] this might, maybe, actually kinda work --- distributed/scheduler.py | 226 ++++++++++++++------- distributed/tests/test_scheduler.py | 24 +-- distributed/tests/test_scheduler_family.py | 222 ++++++++++++++++++++ 3 files changed, 378 insertions(+), 94 deletions(-) create mode 100644 distributed/tests/test_scheduler_family.py diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 072717ceec7..73e01eb0257 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -32,7 +32,7 @@ from contextlib import suppress from functools import partial from numbers import Number -from typing import TYPE_CHECKING, Any, ClassVar, Literal, NoReturn, cast, overload +from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast, overload import psutil from sortedcontainers import SortedDict, SortedSet @@ -1879,7 +1879,7 @@ def decide_worker_rootish_queuing_disabled( return ws - def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: + def decide_worker_rootish_queuing_enabled(self) -> WorkerState: """Pick a worker for a runnable root-ish task, if not all are busy. Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer @@ -1910,10 +1910,11 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # If `is_rootish` changes to a static definition, then add that assertion here # (and actually pass in the task). assert not math.isinf(self.WORKER_SATURATION) + assert self.idle - if not self.idle: - # All workers busy? Task gets/stays queued. - return None + # if not self.idle: + # # All workers busy? Task gets/stays queued. + # return None # Pick the least busy worker. ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) @@ -1929,28 +1930,28 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: def decide_worker_from_family( self, family: tuple[set[TaskState], set[TaskState]] | None - ) -> WorkerState | None: + ) -> WorkerState: if family is not None: - siblings, children = family + siblings, downstream = family # First see if any downstream tasks are already planned somewhere; use that candidates: Counter[WorkerState] = Counter() - for cts in children: - if pws := cts.planned_on: + for dts in downstream: + if pws := dts.planned_on: if self.workers.get(pws.address) is pws: candidates.update((pws,)) else: - cts.planned_on = None + dts.planned_on = None if candidates: return candidates.most_common(1)[0][0] # If not, see if any sibling tasks are already planned somewhere; use that - for cts in siblings: - if pws := cts.planned_on: + for dts in siblings: + if pws := dts.planned_on: if self.workers.get(pws.address) is pws: candidates.update((pws,)) else: - cts.planned_on = None + dts.planned_on = None if candidates: return candidates.most_common(1)[0][0] @@ -7707,59 +7708,66 @@ def _queueable_to_processing( state: SchedulerState, ts: TaskState ) -> tuple[Recs, dict[str, list[Any]], dict[str, list[Any]]]: "Common logic for transitioning a queueable (root-ish) task to processing, along with the rest of its family" - recommendations: Recs = {} - worker_msgs: dict[str, list[Any]] = {} + # Fastpath: skip family search and everything else if no workers are free. + # Since all siblings are assigned at once, this means `ts` belongs to a family we haven't processed yet. + # This means that we've already filled the cluster with root task families, and shouldn't schedule any more. + if not state.idle: + return {ts.key: "queued"}, {}, {} fam = family(ts, 20) # TODO maxsize as config/what?! - if ws := state.decide_worker_from_family(fam): - worker_msgs = _add_to_processing(state, ts, ws) - if fam: - siblings, children = fam - - # Schedule other tasks that will all need to be in memory - # with this task on the same worker at once. - - # TODO what about when the first task in a family that's already on a worker completes, - # but not the whole family? We want to wait for the whole family to be done before assigning - # another one. We don't want to take a slot that could be used for the downstream task in the future. - - # If workers could be responsible with memory, this would be okay. Because if everything in the previous - # family is done execpt one input to the downstream, then yes, we might as well get started on a new family - # while we're waiting, as long as we have the memory capacity to do so. - for fts in sorted(siblings, key=operator.attrgetter("priority")): - # FIXME these tasks may not all necessarily be runnable. - # Or maybe some are even in memory? - # When does this happen, and how should we handle them? - assert fts is not ts - assert fts.planned_on is None or fts.planned_on is ws, ( - fts.planned_on, - ws, - ) + ws = state.decide_worker_from_family(fam) + + if ts.state == "queued": + state.queued.remove(ts) + worker_msgs: dict[str, list[Any]] = _add_to_processing(state, ts, ws) + + recommendations: Recs = {} + if fam: + siblings, downstream = fam + # Schedule other tasks that will all need to be in memory + # with this task on the same worker at once. + + # TODO what about when the first task in a family that's already on a worker completes, + # but not the whole family? We want to wait for the whole family to be done before assigning + # another one. We don't want to take a slot that could be used for the downstream task in the future. + + # If workers could be responsible with memory, this would be okay. Because if everything in the previous + # family is done execpt one input to the downstream, then yes, we might as well get started on a new family + # while we're waiting, as long as we have the memory capacity to do so. + for fts in sorted(siblings, key=operator.attrgetter("priority")): + # FIXME these tasks may not all necessarily be runnable. + # Or maybe some are even in memory? + # When does this happen, and how should we handle them? + assert fts is not ts + assert fts.planned_on is None or fts.planned_on is ws, ( + fts.planned_on, + ws, + ) - if fts.state == "processing": - assert fts.processing_on is ws, (fts.processing_on, ws) + if fts.state == "processing": + assert fts.processing_on is ws, (fts.processing_on, ws) - if fts.state == "memory": - assert ws in fts.who_has, (fts.who_has, ws) + if fts.state == "memory": + assert ws in fts.who_has, (fts.who_has, ws) - if fts.state == "queued": - if state.validate: - assert fts in state.queued - state.queued.discard(fts) + if fts.state == "queued": + if state.validate: + assert fts in state.queued + state.queued.discard(fts) - if fts.state in ("released", "waiting", "queued"): - fts.planned_on = ws + if fts.state in ("released", "waiting", "queued"): + fts.planned_on = ws - if not fts.waiting_on: # wtf if it's released?? - update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) + if not fts.waiting_on: # wtf if it's released?? + update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) - # This recommendation will be a no-op. It's just to remove any existing - # recommendation for the key from the recommendations queue. - recommendations[fts.key] = "processing" + # This recommendation will be a no-op. It's just to remove any existing + # recommendation for the key from the recommendations queue. + recommendations[fts.key] = "processing" - for cts in children: - if cts.planned_on is None: - cts.planned_on = ws + for cts in downstream: + if cts.planned_on is None: + cts.planned_on = ws return recommendations, {}, worker_msgs @@ -8262,40 +8270,110 @@ def _worker_full(ws: WorkerState, saturation_factor: float) -> bool: return _task_slots_available(ws, saturation_factor) <= 0 +def _previous_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: + if len(ts.dependents) != 1 or len(ts.dependencies) > maxsize: + return None + + prev: TaskState | None = None + for dts in ts.dependencies: + if len(dts.dependents) > maxsize: # widely-shared; ignore it + continue + if prev: + return None + prev = dts + + return prev + + +def _next_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: + if len(ts.dependents) != 1 or len(ts.dependencies) > maxsize: + return None + + # Check if this is part of a linear chain: + # exactly 1 dependency, excluding widely-shared tasks. + non_widely_shared = 0 + for dts in ts.dependencies: + if len(dts.dependents) > maxsize: # widely-shared; ignore it + continue + if non_widely_shared: + return None + non_widely_shared += 1 + + return next(iter(ts.dependents)) + + def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] | None: - if len(ts.dependents) > maxsize: + """ + All tasks in a family must be in memory at once to compute at least one common dependency. + + Returns these ``sibling`` tasks, and the set of ``downstream`` (dependent) tasks that + the siblings will used to compute. + + All ``siblings`` and ``downstream`` should be scheduled onto the same worker. + All ``siblings`` should be be scheduled at once---there's no benefit to queuing them. + + For the purposes of identifying families: + + * Linear chains are collapsed (traversed up and down) + * Widely-shared tasks (tasks with > ``maxsize`` dependents) are ignored + + If the task's family is too large, or empty, returns None. + That is, if ``siblings`` or ``downstream`` would be larger than ``maxsize``, + or ``ts.dependents`` is empty, returns None + """ + # TODO potentially could be useful to distinguish between 'too big' + # and empty family---we might want to schedule them differently. + # That is, maybe `None` and `(set(), set())` might not be synonymous. + if not ts.dependents or len(ts.dependents) > maxsize: return None siblings: set[TaskState] = set() - children: set[TaskState] = set() - for child in ts.dependents: # TODO even support multiple dependents? - # Traverse linear chains - while len(sibs := child.dependencies) == 1 and len(gc := child.dependents) == 1: - child = next(iter(gc)) + downstream: set[TaskState] = set() + # TODO maintain `seen` set to avoid repeated traversal? Should we add on the way down, or just back up? + for dts in ts.dependents: # TODO even support multiple dependents? + # Traverse down linear chains + while ndts := _next_in_linear_chain(dts, maxsize): + # TODO check seen + dts = ndts + + if dts in downstream: + # No need to traverse from a task we've already seen + continue + downstream.add(dts) + + if dts in siblings: + # `siblings` and `downstream` are exclusive, and `siblings` takes priority + siblings.remove(dts) + continue - children.add(child) + sibs = dts.dependencies if len(sibs) == 1: - # Faster path: nothing but linear chains, so no siblings besides `ts` + # Faster path: this is just a linear chain, so no siblings besides `ts` continue if len(sibs) < maxsize: for sts in sibs: # Traverse linear chains _back_ to root tasks - # FIXME probably wrong - while sts is not ts and len(sts.dependencies) == 1: - nsts = next(iter(sts.dependencies)) - if len(nsts.dependents) == 1: - sts = nsts - else: - break + while ( + sts is not ts + and sts not in downstream + and (psts := _previous_in_linear_chain(sts, maxsize)) + ): + # TODO check seen + sts = psts - if sts is ts: + if ( + sts is ts + or len(sts.dependents) > maxsize # ignore widely-shared siblings + or sts in downstream # a->b, b->c, a->c. downstream takes priority. + ): continue siblings.add(sts) if len(siblings) > maxsize: return None - return siblings, children + + return siblings, downstream def update_msgs(msgs: dict[str, list[Any]], new: dict[str, list[Any]]) -> None: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c540965fa54..4fcc3f40eb0 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -40,7 +40,7 @@ from distributed.core import ConnectionPool, Status, clean_exception, connect, rpc from distributed.metrics import time from distributed.protocol.pickle import dumps, loads -from distributed.scheduler import KilledWorker, MemoryState, Scheduler, WorkerState, family +from distributed.scheduler import KilledWorker, MemoryState, Scheduler, WorkerState from distributed.utils import TimeoutError from distributed.utils_test import ( BrokenComm, @@ -136,6 +136,7 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): assert x.key in a.data or x.key in b.data +@pytest.mark.parametrize("sat", [float("inf"), 1.0]) @pytest.mark.parametrize("ndeps", [0, 1, 4]) @pytest.mark.parametrize( "nthreads", @@ -144,14 +145,13 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], ], ) -def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): +def test_decide_worker_coschedule_order_neighbors(sat, ndeps, nthreads): @gen_cluster( client=True, nthreads=nthreads, config={ "distributed.scheduler.work-stealing": False, - # "distributed.scheduler.worker-saturation": float("inf"), - "distributed.scheduler.worker-saturation": 1.0, + "distributed.scheduler.worker-saturation": sat, }, ) async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers): @@ -253,22 +253,6 @@ def random(**kwargs): test_decide_worker_coschedule_order_neighbors_() -@gen_cluster(nthreads=[], client=True) -async def test_family(c, s): - ax = [delayed(i, name=f"a-{i}") for i in range(8)] - bx = [delayed(i, name=f"b-{i}") for i in range(8)] - cx = [delayed(i, name=f"c-{i}") for i in range(8)] - - zs = [a + b + c for a, b, c in zip(ax, bx, cx)] - - fs = c.compute(zs) - await async_wait_for(s.tasks, 5) - - a1 = s.tasks['a-1'] - - a1_fam = family(a1, 1000) - - @gen_cluster( nthreads=[("", 2)] * 4, client=True, diff --git a/distributed/tests/test_scheduler_family.py b/distributed/tests/test_scheduler_family.py new file mode 100644 index 00000000000..0ec4e0f676e --- /dev/null +++ b/distributed/tests/test_scheduler_family.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import operator + +from tlz import partition_all + +from dask import delayed + +from distributed.scheduler import family +from distributed.utils_test import async_wait_for, gen_cluster, slowidentity, slowinc + +ident = delayed(slowidentity, pure=True) +inc = delayed(slowinc, pure=True) +add = delayed(operator.add, pure=True) +dsum = delayed(sum, pure=True) + + +async def submit_delayed(client, scheduler, x): + "Submit a delayed object or list of them; wait until tasks are processed on scheduler" + + # dask.visualize(x, optimize_graph=True, collapse_outputs=True) + fs = client.compute(x) + await async_wait_for(lambda: scheduler.tasks, 5) + try: + key = fs.key + except AttributeError: + key = fs[0].key + await async_wait_for(lambda: scheduler.tasks[key].state != "released", 5) + return fs + + +@gen_cluster(nthreads=[], client=True) +async def test_family(c, s): + ax = [delayed(i, name=f"a-{i}") for i in range(3)] + bx = [delayed(i, name=f"b-{i}") for i in range(3)] + cx = [delayed(i, name=f"c-{i}") for i in range(3)] + + zs = [(a + b) + c for a, b, c in zip(ax, bx, cx)] + + _ = await submit_delayed(c, s, zs) + + a1 = s.tasks["a-1"] + b1 = s.tasks["b-1"] + c1 = s.tasks["c-1"] + + fam = family(a1, 1000) + assert fam + sibs, downstream = fam + assert sibs == {b1} + assert len(downstream) == 1 + add_1_1 = next(iter(downstream)) + + fam = family(c1, 1000) + assert fam + sibs, downstream = fam + assert sibs == {add_1_1} + assert len(downstream) == 1 # don't know keys + add_1_2 = next(iter(downstream)) + + fam = family(add_1_1, 1000) + assert fam + sibs, downstream = fam + assert sibs == {c1} + assert downstream == {add_1_2} + + assert family(add_1_2, 1000) is None + + +@gen_cluster(nthreads=[], client=True) +async def test_family_linear_chains(c, s): + r""" + final + / \ \ + / \ \ + -------------- + | s2 | s2 s2 + | / \ | / \ / \ + |/______ \ | | | | | + ------------- | | | | | | | + | s1 | | | | s1 | s1 | + | / | \ | | | | / | \ | / | \ | + | x | | | | | | x | | | x | | | + | | | | | | | | | | | | | | | | + | x | x | | x | x | x x x | x x + | | | | | | | | | | | | | | | | + | a b c | | d | a b c d a b c d + ------------- ------ / / / / / / / / + \ \ \ \ \ \ / / / / / / / / + r + """ + root = delayed(0, name="root") + ax = [ + ident(ident(inc(root, dask_key_name=f"a-{i}"))) for i in range(3) # 2-chain(z) + ] + bx = [inc(root, dask_key_name=f"b-{i}") for i in range(3)] # 0-chain + cx = [ident(inc(root, dask_key_name=f"c-{i}")) for i in range(3)] # 1-chain + s1x = [ + dsum([a, b, c], dask_key_name=f"s1-{i}") + for i, (a, b, c) in enumerate(zip(ax, bx, cx)) + ] + + dx = [ident(inc(root, dask_key_name=f"d-{i}")) for i in range(3)] # 1-chain + s2x = [ + add(s1, d, dask_key_name=f"s2-{i}") for i, (s1, d) in enumerate(zip(s1x, dx)) + ] + + final = dsum(s2x, dask_key_name="final") + + _ = await submit_delayed(c, s, final) + + root = s.tasks["root"] + a1 = s.tasks["a-1"] + b1 = s.tasks["b-1"] + c1 = s.tasks["c-1"] + d1 = s.tasks["d-1"] + s1_1 = s.tasks["s1-1"] + s2_1 = s.tasks["s2-1"] + final = s.tasks["final"] + + await async_wait_for(lambda: final.state == "waiting", 5) + + # `a` traverses chains up and down to find `b` and `c` + # Does *not* include `d`: `d` is not required to compute `s1` + fam = family(a1, 4) + assert fam + sibs, downstream = fam + assert sibs == {b1, c1} + assert downstream == {s1_1} + + # `d` traverses chains up to find `s2` + # does not traverse down past `s2` + fam = family(d1, 4) + assert fam + sibs, downstream = fam + assert sibs == {s1_1} + assert downstream == {s2_1} + + # Don't traverse a linear chain past self + mid_chain = next(iter(a1.dependents)) + fam = family(mid_chain, 4) + assert fam + sibs, downstream = fam + assert sibs == {b1, c1} + assert downstream == {s1_1} + + # `root` has no family with small cutoff + assert family(root, 4) is None + + # With large cutoff, `root` has no siblings. + # But the `s1` and `s2` tasks are all considered downstream, if you + # collapse the linear chains (which include `a`, `b`, `c`, `d`). + + # Note that `s1`s could be considered both siblings and downstreams + # (siblings, since they need to be in memory along with root to compute `s2`). + # But tasks that meet this criteria are explicitly labeled as only downstream. + fam = family(root, 1000) + assert fam + sibs, downstream = fam + assert sibs == set() + assert {ts.key for ts in downstream} == { + "s1-0", + "s1-1", + "s1-2", + "s2-0", + "s2-1", + "s2-2", + } + + +@gen_cluster(nthreads=[], client=True) +async def test_family_linear_chains_plus_widely_shared(c, s): + shared = delayed(0, name="shared") + roots = [delayed(i, name=f"r-{i}") for i in range(8)] + ax = [add(r, shared, dask_key_name=f"a-{i}") for i, r in enumerate(roots)] + sx = [ + dsum(axs, dask_key_name=f"s-{i}") for i, axs in enumerate(partition_all(3, ax)) + ] + + _ = await submit_delayed(c, s, sx) + + r0 = s.tasks["r-0"] + r1 = s.tasks["r-1"] + r2 = s.tasks["r-2"] + s0 = s.tasks["s-0"] + + fam = family(r0, 4) + assert fam + sibs, downstream = fam + assert sibs == {r1, r2} + assert downstream == {s0} + + +@gen_cluster(nthreads=[], client=True) +async def test_family_triangle(c, s): + r""" + z + /| + y | + \ | + x + """ + x = delayed(0, name="x") + y = inc(x, dask_key_name="y") + z = add(x, y, dask_key_name="z") + + _ = await submit_delayed(c, s, z) + + x = s.tasks["x"] + y = s.tasks["y"] + z = s.tasks["z"] + + fam = family(x, 4) + assert fam + sibs, downstream = fam + assert sibs == set() + assert downstream == {z} # `y` is just a linear chain, not downstream + + fam = family(y, 4) + assert fam + sibs, downstream = fam + assert sibs == {x} + assert downstream == {z} From b2157a9b68caa956b71ae64421db0fe0329d2bc6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 29 Sep 2022 12:32:17 -0600 Subject: [PATCH 07/24] rip some things out to just focus on root tasks no need for `planned_on` when non-root tasks won't follow this code path anyway. normal `decide_worker` should do a decent job in that case. we do still need a `decide_worker_from_family` that goes wherever sibling tasks are already in memory or processing. this should basically make rescheduling work as well. --- distributed/scheduler.py | 96 +++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 56 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 73e01eb0257..daaf20dd493 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -17,7 +17,7 @@ import uuid import warnings import weakref -from collections import Counter, defaultdict, deque +from collections import defaultdict, deque from collections.abc import ( Callable, Collection, @@ -961,8 +961,6 @@ class TaskState: #: further in :doc:`Scheduling Policy `. priority: tuple[int, ...] - planned_on: WorkerState | None - # Attribute underlying the state property _state: TaskStateState @@ -1155,7 +1153,6 @@ def __init__(self, key: str, run_spec: object, state: TaskStateState): self.waiting_on = set() self.waiters = set() self.who_has = set() - self.planned_on = None self.processing_on = None self.has_lost_dependencies = False self.host_restrictions = None # type: ignore @@ -1928,37 +1925,6 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState: return ws - def decide_worker_from_family( - self, family: tuple[set[TaskState], set[TaskState]] | None - ) -> WorkerState: - if family is not None: - siblings, downstream = family - # First see if any downstream tasks are already planned somewhere; use that - candidates: Counter[WorkerState] = Counter() - for dts in downstream: - if pws := dts.planned_on: - if self.workers.get(pws.address) is pws: - candidates.update((pws,)) - else: - dts.planned_on = None - - if candidates: - return candidates.most_common(1)[0][0] - - # If not, see if any sibling tasks are already planned somewhere; use that - for dts in siblings: - if pws := dts.planned_on: - if self.workers.get(pws.address) is pws: - candidates.update((pws,)) - else: - dts.planned_on = None - - if candidates: - return candidates.most_common(1)[0][0] - - # No family, or nothing is planned anywhere. Pick a new worker. - return self.decide_worker_rootish_queuing_enabled() - def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: """Pick a worker for a runnable non-root task, considering dependencies and restrictions. @@ -7714,14 +7680,23 @@ def _queueable_to_processing( if not state.idle: return {ts.key: "queued"}, {}, {} - fam = family(ts, 20) # TODO maxsize as config/what?! - ws = state.decide_worker_from_family(fam) + # NOTE: we don't have family-based decide-worker logic yet, since it's currently impossible for siblings to be spread + # across multiple workers, since all sibling tasks are scheduled onto the same worker at once. + # (This is relying on the assumption that all sibling tasks are always runnable, which `is_rootish` _kinda_ should guarantee but maybe + # could break in some cases). + # So the corollary of this is: + # * on scale-up, we pop a task off the queue and schedule all its siblings, all of which are guaranteed not to be running or + # in memory anywhere else yet. Sibling-based co-assignment is unnecessary. + # * on scale-down, all siblings are processing or in memory on the same worker, so they all get rescheduled. + # FIXME except some might have been replicated!! we should schedule near those replicas! + ws = state.decide_worker_rootish_queuing_enabled() if ts.state == "queued": state.queued.remove(ts) worker_msgs: dict[str, list[Any]] = _add_to_processing(state, ts, ws) recommendations: Recs = {} + fam = family(ts, 20) # TODO maxsize as config/what?! if fam: siblings, downstream = fam # Schedule other tasks that will all need to be in memory @@ -7739,35 +7714,43 @@ def _queueable_to_processing( # Or maybe some are even in memory? # When does this happen, and how should we handle them? assert fts is not ts - assert fts.planned_on is None or fts.planned_on is ws, ( - fts.planned_on, - ws, - ) - if fts.state == "processing": - assert fts.processing_on is ws, (fts.processing_on, ws) + # breaks our assumption that all siblings must be runnable at the same time + assert fts.state != "processing", (fts, fts.processing_on, ts, ws) + # if fts.state == "processing": + # assert fts.processing_on is ws, (fts.processing_on, ws) - if fts.state == "memory": - assert ws in fts.who_has, (fts.who_has, ws) + # if fts.state == "memory": + # # only can happen in the case of rescheduling, and replicas already exist + # continue + # # assert ws in fts.who_has, (fts.who_has, ws) if fts.state == "queued": if state.validate: assert fts in state.queued state.queued.discard(fts) - if fts.state in ("released", "waiting", "queued"): - fts.planned_on = ws + assert fts.state in ("released", "waiting", "queued"), (fts, ts) - if not fts.waiting_on: # wtf if it's released?? - update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) + # TODO when is this not the case, and what should we do then? + # Violates the assumption that all siblings are runnable together + # (which is a bad assumption anyway). + # In that case, we have a task that's going to run later, once its deps + # are available. I guess it should just schedule near those deps. + # Unless they're widely-shared, in which case it should schedule near its family. + # In which case it should look root-ish, so it should come back here. + # Therefore, we again need a `decide_worker_from_family` that co-locates with + # in-memory or processing tasks. - # This recommendation will be a no-op. It's just to remove any existing - # recommendation for the key from the recommendations queue. - recommendations[fts.key] = "processing" + if ( + not fts.waiting_on + ): # wtf if it's released?? then this hasn't been set yet. + # assert not fts.waiting_on, (fts, fts.waiting_on, ts) + update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) - for cts in downstream: - if cts.planned_on is None: - cts.planned_on = ws + # This recommendation will be a no-op. It's just to remove any existing + # recommendation for the key from the recommendations queue. + recommendations[fts.key] = "processing" return recommendations, {}, worker_msgs @@ -7783,7 +7766,6 @@ def _add_to_processing( assert (o := state.workers.get(ws.address)) is ws, (ws, o) state._set_duration_estimate(ts, ws) - ts.planned_on = None ts.processing_on = ws ts.state = "processing" state.acquire_resources(ts, ws) @@ -8373,6 +8355,8 @@ def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] if len(siblings) > maxsize: return None + # NOTE: `downstream` isn't used for scheduling yet, since family scheduling + # only applies to root tasks. But it would be used for STA. return siblings, downstream From 7487e3c0968b60b0933a44cf1c70c7f1823abe51 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 29 Sep 2022 13:00:00 -0600 Subject: [PATCH 08/24] WIP `decide_worker_from_family` deadlocks when a worker is restarted midway through currently --- distributed/scheduler.py | 74 +++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index daaf20dd493..6553512f2b4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1876,30 +1876,9 @@ def decide_worker_rootish_queuing_disabled( return ws - def decide_worker_rootish_queuing_enabled(self) -> WorkerState: - """Pick a worker for a runnable root-ish task, if not all are busy. - - Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer - tasks running than threads, as set by ``distributed.scheduler.worker-saturation``). - It does not consider the location of dependencies, since they'll end up on every - worker anyway. - - If all workers are full, returns None, meaning the task should transition to - ``queued``. The scheduler will wait to send it to a worker until a thread opens - up. This ensures that downstream tasks always run before new root tasks are - started. - - This does not try to schedule sibling tasks on the same worker; in fact, it - usually does the opposite. Even though this increases subsequent data transfer, - it typically reduces overall memory use by eliminating root task overproduction. - - Returns - ------- - ws: WorkerState | None - The worker to assign the task to. If there are no idle workers, returns - None, in which case the task should be transitioned to ``queued``. - - """ + def decide_worker_from_family( + self, family: tuple[set[TaskState], set[TaskState]] | None + ) -> WorkerState: if self.validate: # We don't `assert self.is_rootish(ts)` here, because that check is dependent on # cluster size. It's possible a task looked root-ish when it was queued, but the @@ -1909,11 +1888,29 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState: assert not math.isinf(self.WORKER_SATURATION) assert self.idle - # if not self.idle: - # # All workers busy? Task gets/stays queued. - # return None + if family: + siblings, downstream = family + # If any tasks are in memory or processing, use the worker that holds the most data already. + candidates: defaultdict[WorkerState, int] = defaultdict(lambda: 0) + for ts in siblings: + for ws in ts.who_has: + candidates[ws] += ts.get_nbytes() + if ts.processing_on: # NOTE: exclusive with `ts.who_has` + tg = ts.group + nbytes_estimate = ( + round(tg.nbytes_total / nmem) + if (nmem := tg.states["memory"]) + else DEFAULT_DATA_SIZE + ) + candidates[ts.processing_on] += nbytes_estimate + if candidates: + ws, _ = max(candidates.items(), key=operator.itemgetter(1)) + logger.info( + f"Scheduling family on sibling worker {ws}, {candidates=}, {family=}" + ) + return ws - # Pick the least busy worker. + # No siblings are anywhere else (or no family at all). Pick the least busy worker. ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) if self.validate: assert self.workers.get(ws.address) is ws @@ -7680,23 +7677,15 @@ def _queueable_to_processing( if not state.idle: return {ts.key: "queued"}, {}, {} - # NOTE: we don't have family-based decide-worker logic yet, since it's currently impossible for siblings to be spread - # across multiple workers, since all sibling tasks are scheduled onto the same worker at once. - # (This is relying on the assumption that all sibling tasks are always runnable, which `is_rootish` _kinda_ should guarantee but maybe - # could break in some cases). - # So the corollary of this is: - # * on scale-up, we pop a task off the queue and schedule all its siblings, all of which are guaranteed not to be running or - # in memory anywhere else yet. Sibling-based co-assignment is unnecessary. - # * on scale-down, all siblings are processing or in memory on the same worker, so they all get rescheduled. - # FIXME except some might have been replicated!! we should schedule near those replicas! - ws = state.decide_worker_rootish_queuing_enabled() + fam = family(ts, 20) # TODO maxsize as config/what?! + ws = state.decide_worker_from_family(fam) + # ^ NOTE: This is all we need to for good (re)scheduling when the cluster changes size. if ts.state == "queued": state.queued.remove(ts) worker_msgs: dict[str, list[Any]] = _add_to_processing(state, ts, ws) recommendations: Recs = {} - fam = family(ts, 20) # TODO maxsize as config/what?! if fam: siblings, downstream = fam # Schedule other tasks that will all need to be in memory @@ -7720,9 +7709,10 @@ def _queueable_to_processing( # if fts.state == "processing": # assert fts.processing_on is ws, (fts.processing_on, ws) - # if fts.state == "memory": - # # only can happen in the case of rescheduling, and replicas already exist - # continue + if fts.state == "memory": + # only can happen in the case of rescheduling, and replicas already exist + logger.info(f"Skipping in memory {fts}") + continue # # assert ws in fts.who_has, (fts.who_has, ws) if fts.state == "queued": From 06d74dcf7106a1196a777f215af55609133b9419 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 29 Sep 2022 19:35:41 -0600 Subject: [PATCH 09/24] ignore non-running candidates --- distributed/scheduler.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6553512f2b4..5f9f475f11b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1894,8 +1894,12 @@ def decide_worker_from_family( candidates: defaultdict[WorkerState, int] = defaultdict(lambda: 0) for ts in siblings: for ws in ts.who_has: - candidates[ws] += ts.get_nbytes() - if ts.processing_on: # NOTE: exclusive with `ts.who_has` + if ws.status == Status.running: + candidates[ws] += ts.get_nbytes() + if ( + ts.processing_on # NOTE: exclusive with `ts.who_has` + and ts.processing_on.status == Status.running + ): tg = ts.group nbytes_estimate = ( round(tg.nbytes_total / nmem) From 7891b355784f645a1e37f2ba363465fd5af7ed0d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 29 Sep 2022 19:42:39 -0600 Subject: [PATCH 10/24] controversial: set family maxsize to nthreads this almost certainly needs to be revisited. it's a sort of reasonable default, since maxsize is really about what's "widely-shared" or not. and a good definition of widely-shared is that it saturates the cluster. however, i have a feeling this can also be too small for small clusters. especially in a local cluster with, say, 8 threads, you could get reasonably-sized families split across workers instead of co-assigned. --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5f9f475f11b..5bb6e216adc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7681,7 +7681,7 @@ def _queueable_to_processing( if not state.idle: return {ts.key: "queued"}, {}, {} - fam = family(ts, 20) # TODO maxsize as config/what?! + fam = family(ts, min(state.total_nthreads, 512)) # TODO maxsize as config/what?! ws = state.decide_worker_from_family(fam) # ^ NOTE: This is all we need to for good (re)scheduling when the cluster changes size. From e30bf019be61f82e36298e0550ba87e3e79a9418 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 29 Sep 2022 19:43:25 -0600 Subject: [PATCH 11/24] driveby: syntax ok `test_queued_paused_unpaused` didn't affect test, just unnecessarily wrong --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 4fcc3f40eb0..c332821db42 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -361,7 +361,7 @@ async def test_queued_paused_unpaused(c, s, a, b, queue): f1s = c.map(slowinc, range(16)) f2s = c.map(slowinc, f1s) - final = c.submit(sum, *f2s) + final = c.submit(sum, f2s) del f1s, f2s while not a.data or not b.data: From fec5fd84ad16a1c2e1c77e0055f8dad7b180d8ae Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 00:18:11 -0600 Subject: [PATCH 12/24] widely-shared cutoff and cluster size this fixes a number of things: * when cluster is larger than tasks, explicitly break up co-assignment to ensure full utilization * separate max family size from widely-shared size. widely-shared size is directly related to cluster size; max size is just about an upper bound on iteration. * downstreams larger than max size shouldn't be downstreams --- distributed/scheduler.py | 83 ++++++++++++++-------- distributed/tests/test_scheduler.py | 64 ++++++++++++++++- distributed/tests/test_scheduler_family.py | 77 ++++++++++++++++---- 3 files changed, 181 insertions(+), 43 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5bb6e216adc..257350a5b71 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7681,7 +7681,8 @@ def _queueable_to_processing( if not state.idle: return {ts.key: "queued"}, {}, {} - fam = family(ts, min(state.total_nthreads, 512)) # TODO maxsize as config/what?! + # TODO maxsize as config/what?! + fam = family(ts, maxsize=20, widely_shared_cutoff=len(state.workers)) ws = state.decide_worker_from_family(fam) # ^ NOTE: This is all we need to for good (re)scheduling when the cluster changes size. @@ -7691,9 +7692,36 @@ def _queueable_to_processing( recommendations: Recs = {} if fam: - siblings, downstream = fam # Schedule other tasks that will all need to be in memory # with this task on the same worker at once. + siblings, downstream = fam + sorted_siblings = sorted(siblings, key=operator.attrgetter("priority")) + + # Check that we're parallelism-constrained before saturating a worker + if len(siblings) > ws.nthreads: + # In the somewhat rare case that we have more workers than families, + # parallelism is abundant. We should give up some co-assignment so that we + # don't leave workers idle. This is quite hard to determine statically, + # because we have no idea a) how many total root tasks there are, and b) how + # many total root families there are. + + # We can _very brittlely_ guess via `TaskGroup`s. _We generally assume the + # graph structure of root tasks is homogeneous_, which is typically true + # with Dask collections, but certainly not true in general. + # The downstreams' TaskGroups give a guess as to the number of families (a + # downstream is effectively the output of a family). + # If we assume every family is the same size as this one (also typically the + # case with collections, but not true in general), then each family will + # under-schedule by this factor, fully filling all workers. + est_total_families = ( + (sum(len(dts.group) for dts in downstream) // len(downstream)) + if downstream + else 0 + ) + family_saturation = est_total_families / len(state.workers) + if family_saturation < 1: + max_siblings = round(len(siblings) * family_saturation) + sorted_siblings = sorted_siblings[:max_siblings] # TODO what about when the first task in a family that's already on a worker completes, # but not the whole family? We want to wait for the whole family to be done before assigning @@ -7702,7 +7730,7 @@ def _queueable_to_processing( # If workers could be responsible with memory, this would be okay. Because if everything in the previous # family is done execpt one input to the downstream, then yes, we might as well get started on a new family # while we're waiting, as long as we have the memory capacity to do so. - for fts in sorted(siblings, key=operator.attrgetter("priority")): + for fts in sorted_siblings: # FIXME these tasks may not all necessarily be runnable. # Or maybe some are even in memory? # When does this happen, and how should we handle them? @@ -7726,20 +7754,13 @@ def _queueable_to_processing( assert fts.state in ("released", "waiting", "queued"), (fts, ts) - # TODO when is this not the case, and what should we do then? - # Violates the assumption that all siblings are runnable together - # (which is a bad assumption anyway). - # In that case, we have a task that's going to run later, once its deps - # are available. I guess it should just schedule near those deps. - # Unless they're widely-shared, in which case it should schedule near its family. - # In which case it should look root-ish, so it should come back here. - # Therefore, we again need a `decide_worker_from_family` that co-locates with - # in-memory or processing tasks. - + # When `fts` is not runnable yet, that means it's waiting for deps. So it + # should just schedule near those deps. Unless they're widely-shared, in + # which case it should schedule near its family. In which case it should + # look root-ish, so it should come back here. if ( not fts.waiting_on ): # wtf if it's released?? then this hasn't been set yet. - # assert not fts.waiting_on, (fts, fts.waiting_on, ts) update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) # This recommendation will be a no-op. It's just to remove any existing @@ -8246,13 +8267,13 @@ def _worker_full(ws: WorkerState, saturation_factor: float) -> bool: return _task_slots_available(ws, saturation_factor) <= 0 -def _previous_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: - if len(ts.dependents) != 1 or len(ts.dependencies) > maxsize: +def _previous_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None: + if len(ts.dependents) != 1 or len(ts.dependencies) > cutoff: return None prev: TaskState | None = None for dts in ts.dependencies: - if len(dts.dependents) > maxsize: # widely-shared; ignore it + if len(dts.dependents) > cutoff: # widely-shared; ignore it continue if prev: return None @@ -8261,15 +8282,15 @@ def _previous_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: return prev -def _next_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: - if len(ts.dependents) != 1 or len(ts.dependencies) > maxsize: +def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None: + if len(ts.dependents) != 1 or len(ts.dependencies) > cutoff: return None # Check if this is part of a linear chain: # exactly 1 dependency, excluding widely-shared tasks. non_widely_shared = 0 for dts in ts.dependencies: - if len(dts.dependents) > maxsize: # widely-shared; ignore it + if len(dts.dependents) > cutoff: # widely-shared; ignore it continue if non_widely_shared: return None @@ -8278,7 +8299,9 @@ def _next_in_linear_chain(ts: TaskState, maxsize: int) -> TaskState | None: return next(iter(ts.dependents)) -def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] | None: +def family( + ts: TaskState, maxsize: int, widely_shared_cutoff: int +) -> tuple[set[TaskState], set[TaskState]] | None: """ All tasks in a family must be in memory at once to compute at least one common dependency. @@ -8291,16 +8314,16 @@ def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] For the purposes of identifying families: * Linear chains are collapsed (traversed up and down) - * Widely-shared tasks (tasks with > ``maxsize`` dependents) are ignored + * Widely-shared tasks (tasks with > ``widely_shared_cutoff`` dependents) are ignored If the task's family is too large, or empty, returns None. - That is, if ``siblings`` or ``downstream`` would be larger than ``maxsize``, - or ``ts.dependents`` is empty, returns None + That is, if ``siblings`` would be larger than ``maxsize``, or ``downstream`` would be + larger than ``widely_shared_cutoff``, or ``ts.dependents`` is empty, returns None. """ # TODO potentially could be useful to distinguish between 'too big' # and empty family---we might want to schedule them differently. # That is, maybe `None` and `(set(), set())` might not be synonymous. - if not ts.dependents or len(ts.dependents) > maxsize: + if not ts.dependents or len(ts.dependents) > widely_shared_cutoff: return None siblings: set[TaskState] = set() @@ -8308,14 +8331,13 @@ def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] # TODO maintain `seen` set to avoid repeated traversal? Should we add on the way down, or just back up? for dts in ts.dependents: # TODO even support multiple dependents? # Traverse down linear chains - while ndts := _next_in_linear_chain(dts, maxsize): + while ndts := _next_in_linear_chain(dts, widely_shared_cutoff): # TODO check seen dts = ndts if dts in downstream: # No need to traverse from a task we've already seen continue - downstream.add(dts) if dts in siblings: # `siblings` and `downstream` are exclusive, and `siblings` takes priority @@ -8325,22 +8347,25 @@ def family(ts: TaskState, maxsize: int) -> tuple[set[TaskState], set[TaskState]] sibs = dts.dependencies if len(sibs) == 1: # Faster path: this is just a linear chain, so no siblings besides `ts` + downstream.add(dts) continue if len(sibs) < maxsize: + downstream.add(dts) for sts in sibs: # Traverse linear chains _back_ to root tasks while ( sts is not ts and sts not in downstream - and (psts := _previous_in_linear_chain(sts, maxsize)) + and (psts := _previous_in_linear_chain(sts, widely_shared_cutoff)) ): # TODO check seen sts = psts if ( sts is ts - or len(sts.dependents) > maxsize # ignore widely-shared siblings + or len(sts.dependents) + >= widely_shared_cutoff # ignore widely-shared siblings or sts in downstream # a->b, b->c, a->c. downstream takes priority. ): continue diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c332821db42..5daa05391ac 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import cloudpickle import psutil import pytest -from tlz import concat, first, merge, valmap +from tlz import concat, first, merge, partition, valmap from tornado.ioloop import IOLoop, PeriodicCallback import dask @@ -410,6 +410,68 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 1)] * 6, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_utilization_over_co_assignment(c, s, *workers): + event = Event() + roots = [delayed(event.wait)(5, dask_key_name=f"r-{i}") for i in range(6)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(2, roots)) + ] + fs = c.compute(aggs) + + await async_wait_for(lambda: any(w.state.tasks for w in workers), timeout=5) + + # All workers should be used, even though it breaks up co-assignment + assert not s.idle + rts = [s.tasks[r.key] for r in roots] + assert {ts.processing_on for ts in rts} == set(s.workers.values()) + + await event.set() + await wait(fs) + + +@gen_cluster( + client=True, + nthreads=[("", 2)] * 2, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_co_assign_scale_up(c, s, a, b): + event = Event() + devent = delayed(event) + roots = [devent.wait(5, dask_key_name=f"r-{i}") for i in range(16)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(4, roots)) + ] + fs = c.compute(aggs) + + await async_wait_for(lambda: s.queued, timeout=5) + + # Each family of roots should be processing on the same worker, or not at all + for agg in aggs: + tss = s.tasks[agg.key].dependencies + proc = [ts.processing_on for ts in tss] + assert proc == proc[:1] * len(proc) + + async with Worker(s.address, nthreads=2) as w: + await async_wait_for(lambda: w.state.tasks, timeout=5) + assert len(w.state.tasks) == 5 # 4 `r` + the Event + + # Each family of roots should be processing on the same worker, or not at all + for agg in aggs: + tss = s.tasks[agg.key].dependencies + proc = [ts.processing_on for ts in tss] + assert len(set(proc)) == 1, proc + + await event.set() + await wait(fs) + + @pytest.mark.parametrize( "saturation, expected_task_counts", [ diff --git a/distributed/tests/test_scheduler_family.py b/distributed/tests/test_scheduler_family.py index 0ec4e0f676e..9e7d1197605 100644 --- a/distributed/tests/test_scheduler_family.py +++ b/distributed/tests/test_scheduler_family.py @@ -31,6 +31,13 @@ async def submit_delayed(client, scheduler, x): @gen_cluster(nthreads=[], client=True) async def test_family(c, s): + r""" + z z z + / | / | / | + x | x | x | + / \ | / \ | / \ | + a b c a b c a b c + """ ax = [delayed(i, name=f"a-{i}") for i in range(3)] bx = [delayed(i, name=f"b-{i}") for i in range(3)] cx = [delayed(i, name=f"c-{i}") for i in range(3)] @@ -43,27 +50,27 @@ async def test_family(c, s): b1 = s.tasks["b-1"] c1 = s.tasks["c-1"] - fam = family(a1, 1000) + fam = family(a1, 1000, 1000) assert fam sibs, downstream = fam assert sibs == {b1} assert len(downstream) == 1 add_1_1 = next(iter(downstream)) - fam = family(c1, 1000) + fam = family(c1, 1000, 1000) assert fam sibs, downstream = fam assert sibs == {add_1_1} assert len(downstream) == 1 # don't know keys add_1_2 = next(iter(downstream)) - fam = family(add_1_1, 1000) + fam = family(add_1_1, 1000, 1000) assert fam sibs, downstream = fam assert sibs == {c1} assert downstream == {add_1_2} - assert family(add_1_2, 1000) is None + assert family(add_1_2, 1000, 1000) is None @gen_cluster(nthreads=[], client=True) @@ -121,7 +128,7 @@ async def test_family_linear_chains(c, s): # `a` traverses chains up and down to find `b` and `c` # Does *not* include `d`: `d` is not required to compute `s1` - fam = family(a1, 4) + fam = family(a1, 1000, 4) assert fam sibs, downstream = fam assert sibs == {b1, c1} @@ -129,7 +136,7 @@ async def test_family_linear_chains(c, s): # `d` traverses chains up to find `s2` # does not traverse down past `s2` - fam = family(d1, 4) + fam = family(d1, 1000, 4) assert fam sibs, downstream = fam assert sibs == {s1_1} @@ -137,14 +144,14 @@ async def test_family_linear_chains(c, s): # Don't traverse a linear chain past self mid_chain = next(iter(a1.dependents)) - fam = family(mid_chain, 4) + fam = family(mid_chain, 1000, 4) assert fam sibs, downstream = fam assert sibs == {b1, c1} assert downstream == {s1_1} - # `root` has no family with small cutoff - assert family(root, 4) is None + # `root` has no family with small widely-shared cutoff + assert family(root, 1000, 4) is None # With large cutoff, `root` has no siblings. # But the `s1` and `s2` tasks are all considered downstream, if you @@ -153,7 +160,7 @@ async def test_family_linear_chains(c, s): # Note that `s1`s could be considered both siblings and downstreams # (siblings, since they need to be in memory along with root to compute `s2`). # But tasks that meet this criteria are explicitly labeled as only downstream. - fam = family(root, 1000) + fam = family(root, 1000, 1000) assert fam sibs, downstream = fam assert sibs == set() @@ -169,6 +176,14 @@ async def test_family_linear_chains(c, s): @gen_cluster(nthreads=[], client=True) async def test_family_linear_chains_plus_widely_shared(c, s): + r""" + s s s + /|\ /|\ /\ + a a a a a a a a + |\|\|\|\|/|/|/| + | | | | s | | | + r r r r r r r r + """ shared = delayed(0, name="shared") roots = [delayed(i, name=f"r-{i}") for i in range(8)] ax = [add(r, shared, dask_key_name=f"a-{i}") for i, r in enumerate(roots)] @@ -183,7 +198,7 @@ async def test_family_linear_chains_plus_widely_shared(c, s): r2 = s.tasks["r-2"] s0 = s.tasks["s-0"] - fam = family(r0, 4) + fam = family(r0, 1000, 4) assert fam sibs, downstream = fam assert sibs == {r1, r2} @@ -209,14 +224,50 @@ async def test_family_triangle(c, s): y = s.tasks["y"] z = s.tasks["z"] - fam = family(x, 4) + fam = family(x, 1000, 1000) assert fam sibs, downstream = fam assert sibs == set() assert downstream == {z} # `y` is just a linear chain, not downstream - fam = family(y, 4) + fam = family(y, 1000, 1000) assert fam sibs, downstream = fam assert sibs == {x} assert downstream == {z} + + +@gen_cluster(nthreads=[], client=True) +async def test_family_wide_gather_downstream(c, s): + r""" + s + / / / /|\ \ \ + i i i i i i i i + | | | | | | | | + r r r r r r r r + """ + roots = [delayed(i, name=f"r-{i}") for i in range(8)] + incs = [inc(r, dask_key_name=f"i-{i}") for i, r in enumerate(roots)] + sum = dsum(incs, dask_key_name="sum") + + _ = await submit_delayed(c, s, sum) + + rts = [s.tasks[r.key] for r in roots] + sts = s.tasks["sum"] + + fam = family(rts[0], 4, 1000) + assert fam + sibs, downstream = fam + assert sibs == set() + assert downstream == set() # `sum` not downstream because it's too large + + fam = family(rts[0], 1000, 1000) + assert fam + sibs, downstream = fam + assert sibs == set(rts[1:]) + assert downstream == {sts} + + +# TODO test family commutativity. Given any node X in any graph, calculate `family(X)`. +# For each sibling S, `family(S)` should give the same family, regardless of the +# starting node. From 4c84d9460e7725a7f27dfda3dd0fefcd6ea6bbdb Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 00:50:25 -0600 Subject: [PATCH 13/24] just import dask in test --- distributed/tests/test_scheduler_family.py | 27 +++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/distributed/tests/test_scheduler_family.py b/distributed/tests/test_scheduler_family.py index 9e7d1197605..95715ea8144 100644 --- a/distributed/tests/test_scheduler_family.py +++ b/distributed/tests/test_scheduler_family.py @@ -4,20 +4,19 @@ from tlz import partition_all -from dask import delayed +import dask from distributed.scheduler import family from distributed.utils_test import async_wait_for, gen_cluster, slowidentity, slowinc -ident = delayed(slowidentity, pure=True) -inc = delayed(slowinc, pure=True) -add = delayed(operator.add, pure=True) -dsum = delayed(sum, pure=True) +ident = dask.delayed(slowidentity, pure=True) +inc = dask.delayed(slowinc, pure=True) +add = dask.delayed(operator.add, pure=True) +dsum = dask.delayed(sum, pure=True) async def submit_delayed(client, scheduler, x): "Submit a delayed object or list of them; wait until tasks are processed on scheduler" - # dask.visualize(x, optimize_graph=True, collapse_outputs=True) fs = client.compute(x) await async_wait_for(lambda: scheduler.tasks, 5) @@ -38,9 +37,9 @@ async def test_family(c, s): / \ | / \ | / \ | a b c a b c a b c """ - ax = [delayed(i, name=f"a-{i}") for i in range(3)] - bx = [delayed(i, name=f"b-{i}") for i in range(3)] - cx = [delayed(i, name=f"c-{i}") for i in range(3)] + ax = [dask.delayed(i, name=f"a-{i}") for i in range(3)] + bx = [dask.delayed(i, name=f"b-{i}") for i in range(3)] + cx = [dask.delayed(i, name=f"c-{i}") for i in range(3)] zs = [(a + b) + c for a, b, c in zip(ax, bx, cx)] @@ -95,7 +94,7 @@ async def test_family_linear_chains(c, s): \ \ \ \ \ \ / / / / / / / / r """ - root = delayed(0, name="root") + root = dask.delayed(0, name="root") ax = [ ident(ident(inc(root, dask_key_name=f"a-{i}"))) for i in range(3) # 2-chain(z) ] @@ -184,8 +183,8 @@ async def test_family_linear_chains_plus_widely_shared(c, s): | | | | s | | | r r r r r r r r """ - shared = delayed(0, name="shared") - roots = [delayed(i, name=f"r-{i}") for i in range(8)] + shared = dask.delayed(0, name="shared") + roots = [dask.delayed(i, name=f"r-{i}") for i in range(8)] ax = [add(r, shared, dask_key_name=f"a-{i}") for i, r in enumerate(roots)] sx = [ dsum(axs, dask_key_name=f"s-{i}") for i, axs in enumerate(partition_all(3, ax)) @@ -214,7 +213,7 @@ async def test_family_triangle(c, s): \ | x """ - x = delayed(0, name="x") + x = dask.delayed(0, name="x") y = inc(x, dask_key_name="y") z = add(x, y, dask_key_name="z") @@ -246,7 +245,7 @@ async def test_family_wide_gather_downstream(c, s): | | | | | | | | r r r r r r r r """ - roots = [delayed(i, name=f"r-{i}") for i in range(8)] + roots = [dask.delayed(i, name=f"r-{i}") for i in range(8)] incs = [inc(r, dask_key_name=f"i-{i}") for i, r in enumerate(roots)] sum = dsum(incs, dask_key_name="sum") From 58c0c327560791b53885ed35e8ac2d6f8dbbbcba Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 00:50:55 -0600 Subject: [PATCH 14/24] test for a non-commutative family structure idk if/how this actually matters --- distributed/tests/test_scheduler_family.py | 28 +++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler_family.py b/distributed/tests/test_scheduler_family.py index 95715ea8144..4204cc93c3e 100644 --- a/distributed/tests/test_scheduler_family.py +++ b/distributed/tests/test_scheduler_family.py @@ -2,7 +2,7 @@ import operator -from tlz import partition_all +from tlz import partition, partition_all import dask @@ -270,3 +270,29 @@ async def test_family_wide_gather_downstream(c, s): # TODO test family commutativity. Given any node X in any graph, calculate `family(X)`. # For each sibling S, `family(S)` should give the same family, regardless of the # starting node. +# EXECPT THIS ISN'T TRUE + + +@gen_cluster(nthreads=[], client=True) +async def test_family_non_commutative(c, s): + roots = [dask.delayed(i, name=f"r-{i}") for i in range(16)] + aggs = [dsum(rs) for rs in partition(4, roots)] + extra = dsum([roots[::4]], dask_key_name="extra") + + _ = await submit_delayed(c, s, aggs + [extra]) + + rts = [s.tasks[r.key] for r in roots] + ats = [s.tasks[a.key] for a in aggs] + ets = s.tasks["extra"] + + fam = family(rts[0], 1000, 1000) + assert fam + sibs, downstream = fam + assert sibs == set(rts[1:4]) | {rts[4], rts[8], rts[12]} + assert downstream == {ats[0], ets} + + fam = family(rts[1], 1000, 1000) + assert fam + sibs, downstream = fam + assert sibs == {rts[0], rts[2], rts[3]} + assert downstream == {ats[0]} From b6b16118eef5e2f594605d88377b3b026ddd7c76 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 00:51:09 -0600 Subject: [PATCH 15/24] simple scale down test --- distributed/tests/test_scheduler.py | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 5daa05391ac..775784bca11 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -472,6 +472,38 @@ async def test_co_assign_scale_up(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 3, + config={"distributed.scheduler.worker-saturation": 1.0}, +) +async def test_co_assign_scale_down(c, s, *workers): + event = Event() + roots = [delayed(event.wait)(5, dask_key_name=f"r-{i}") for i in range(16)] + aggs = [ + delayed(list)(rs, dask_key_name=f"a-{i}") + for i, rs in enumerate(partition(4, roots)) + ] + # pin roots so we can check where they are at the end + fs = c.compute(aggs + roots) + + await async_wait_for(lambda: s.queued, timeout=5) + + await workers[0].close() + await event.set() + await wait(fs) + + for r in roots: + ts = s.tasks[r.key] + assert len(ts.who_has) == 1, ts.who_has + + for w in workers: + assert not w.transfer_incoming_log + + +# TODO test _where_ tasks get assigned on scale-down. They should prefer to go near their siblings. + + @pytest.mark.parametrize( "saturation, expected_task_counts", [ From f285ebd8f1c5e447075271cf34e4fb51b8a32676 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 01:16:43 -0600 Subject: [PATCH 16/24] fix deadlock on reassignment, hackily --- distributed/scheduler.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 257350a5b71..f2159799cfc 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7747,20 +7747,27 @@ def _queueable_to_processing( continue # # assert ws in fts.who_has, (fts.who_has, ws) - if fts.state == "queued": + if fts.state == "released": + # FIXME if `fts` just went `memory->released`, `waiting_on` will inaccurately be empty. + # 1) this manual transition feels like bad practice + # 2) we can't be certain it shouldn't actually to to `forgotten` (without duplicating logic from `memory->released`) + # 3) it's just kinda weird that tasks can be in this broken `released` state at all. + # it feels degenerate to me. i kinda don't think `released->waiting` should be a transition, but rather + # a shared helper function like `handle_released_task` or something. + # TODO add a test that triggers the need for this + state._transition(fts.key, "waiting", "qtp") + elif fts.state == "queued": if state.validate: assert fts in state.queued state.queued.discard(fts) - assert fts.state in ("released", "waiting", "queued"), (fts, ts) + assert fts.state in ("waiting", "queued"), (fts, ts) # When `fts` is not runnable yet, that means it's waiting for deps. So it # should just schedule near those deps. Unless they're widely-shared, in # which case it should schedule near its family. In which case it should # look root-ish, so it should come back here. - if ( - not fts.waiting_on - ): # wtf if it's released?? then this hasn't been set yet. + if not fts.waiting_on: update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) # This recommendation will be a no-op. It's just to remove any existing From d22174eef7b95ee18d1e7c9fc33599f418318635 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 01:55:22 -0600 Subject: [PATCH 17/24] note question re skip saturated --- distributed/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f2159799cfc..989e2df18aa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1891,6 +1891,7 @@ def decide_worker_from_family( if family: siblings, downstream = family # If any tasks are in memory or processing, use the worker that holds the most data already. + # TODO what if that worker is saturated? Should we skip it? candidates: defaultdict[WorkerState, int] = defaultdict(lambda: 0) for ts in siblings: for ws in ts.who_has: From 63274fa2c7ff065437de54d74aa3afeded5d7bf2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 30 Sep 2022 01:56:16 -0600 Subject: [PATCH 18/24] fix `_next_in_linear_chain` could have traversed into widely-shared deps --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 989e2df18aa..69491e72240 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8296,15 +8296,15 @@ def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None: # Check if this is part of a linear chain: # exactly 1 dependency, excluding widely-shared tasks. - non_widely_shared = 0 + next: TaskState | None = None for dts in ts.dependencies: if len(dts.dependents) > cutoff: # widely-shared; ignore it continue - if non_widely_shared: + if next: return None - non_widely_shared += 1 + next = dts - return next(iter(ts.dependents)) + return next def family( From 3e5250d32c39b47f888a8dcd8e3b479e29f1af0c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 3 Oct 2022 13:12:19 -0600 Subject: [PATCH 19/24] document & ignore heterogeneous sibling scheduling --- distributed/scheduler.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 69491e72240..1d290d4a895 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7732,21 +7732,24 @@ def _queueable_to_processing( # family is done execpt one input to the downstream, then yes, we might as well get started on a new family # while we're waiting, as long as we have the memory capacity to do so. for fts in sorted_siblings: - # FIXME these tasks may not all necessarily be runnable. - # Or maybe some are even in memory? - # When does this happen, and how should we handle them? assert fts is not ts - # breaks our assumption that all siblings must be runnable at the same time - assert fts.state != "processing", (fts, fts.processing_on, ts, ws) - # if fts.state == "processing": - # assert fts.processing_on is ws, (fts.processing_on, ws) + # Rare: siblings already running, or ran, somewhere else. + # Since all siblings are scheduled onto the same worker at the same time, they'll also + # usually share the same fate if that worker dies, and all be re-scheduled at once too. + # The exceptions are: + # - Some in-memory tasks could have been replicated to other workers, but not all. + # - Non-commutative families (siblings set is different depending on which root you start from). + # - Scale up/down could cross the `widely_shared_cutoff`, leading to different assessment of a family. + # TODO tests for these cases + if fts.state == "processing": + logger.info(f"Skipping processing {fts}, {fts.processing_on=}, {ws=}") + continue if fts.state == "memory": # only can happen in the case of rescheduling, and replicas already exist - logger.info(f"Skipping in memory {fts}") + logger.info(f"Skipping in memory {fts}, {ws in fts.who_has=}, {ws=}") continue - # # assert ws in fts.who_has, (fts.who_has, ws) if fts.state == "released": # FIXME if `fts` just went `memory->released`, `waiting_on` will inaccurately be empty. @@ -7765,9 +7768,9 @@ def _queueable_to_processing( assert fts.state in ("waiting", "queued"), (fts, ts) # When `fts` is not runnable yet, that means it's waiting for deps. So it - # should just schedule near those deps. Unless they're widely-shared, in - # which case it should schedule near its family. In which case it should - # look root-ish, so it should come back here. + # should just schedule near those deps using `decide_worker_non_rootish`. + # Unless they're widely-shared, in which case it should schedule near its + # family. In which case it should look root-ish, so it should come back here. if not fts.waiting_on: update_msgs(worker_msgs, _add_to_processing(state, fts, ws)) From 172bbc43648c467d2bb77940d4559f06d5d6deb4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 3 Oct 2022 13:21:08 -0600 Subject: [PATCH 20/24] `decide_worker_from_family`: ignore saturated --- distributed/scheduler.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1d290d4a895..a8dfa952865 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1890,24 +1890,30 @@ def decide_worker_from_family( if family: siblings, downstream = family - # If any tasks are in memory or processing, use the worker that holds the most data already. - # TODO what if that worker is saturated? Should we skip it? + # If any tasks are in memory or processing, use the non-saturated worker that holds the most data already. + # Ignoring saturated workers avoids a 'dogpile' in the case of unusual graph structures. + # ^ TODO test this candidates: defaultdict[WorkerState, int] = defaultdict(lambda: 0) + sws: WorkerState | None for ts in siblings: - for ws in ts.who_has: - if ws.status == Status.running: - candidates[ws] += ts.get_nbytes() + for sws in ts.who_has: + if sws.status == Status.running and not _worker_full( + sws, self.WORKER_SATURATION + ): + candidates[sws] += ts.get_nbytes() if ( - ts.processing_on # NOTE: exclusive with `ts.who_has` - and ts.processing_on.status == Status.running + (sws := ts.processing_on) # NOTE: exclusive with `ts.who_has` + and sws.status == Status.running + and not _worker_full(sws, self.WORKER_SATURATION) ): + # NOTE: siblings processing on different workers is a rare case tg = ts.group nbytes_estimate = ( round(tg.nbytes_total / nmem) if (nmem := tg.states["memory"]) else DEFAULT_DATA_SIZE ) - candidates[ts.processing_on] += nbytes_estimate + candidates[sws] += nbytes_estimate if candidates: ws, _ = max(candidates.items(), key=operator.itemgetter(1)) logger.info( From 8011b734c1dc1d056fe75b3865abbfce98bd579a Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 3 Oct 2022 17:40:24 -0600 Subject: [PATCH 21/24] Revert "fix `_next_in_linear_chain`" Returning `next` instead of `next(iter(ts.dependents))` was the completely wrong direction. This reverts commit 63274fa2c7ff065437de54d74aa3afeded5d7bf2. --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a8dfa952865..43af75dc322 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8305,15 +8305,15 @@ def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None: # Check if this is part of a linear chain: # exactly 1 dependency, excluding widely-shared tasks. - next: TaskState | None = None + non_widely_shared = 0 for dts in ts.dependencies: if len(dts.dependents) > cutoff: # widely-shared; ignore it continue - if next: + if non_widely_shared: return None - next = dts + non_widely_shared += 1 - return next + return next(iter(ts.dependents)) def family( From a8a196b9591c25d0489284df125f05765c525255 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 3 Oct 2022 17:41:17 -0600 Subject: [PATCH 22/24] fixup `_next_in_linear_chain` --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 43af75dc322..19dad48bb64 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8305,13 +8305,13 @@ def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None: # Check if this is part of a linear chain: # exactly 1 dependency, excluding widely-shared tasks. - non_widely_shared = 0 + non_widely_shared = False for dts in ts.dependencies: if len(dts.dependents) > cutoff: # widely-shared; ignore it continue if non_widely_shared: return None - non_widely_shared += 1 + non_widely_shared = True return next(iter(ts.dependents)) From 0922e5c293474e89acac16623fd1741013e30453 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 3 Oct 2022 18:20:08 -0600 Subject: [PATCH 23/24] don't sort idle set maintaining this sort adds a moderate amount of scheduler overhead when workers go in and out of it so often. especially when we had no need for it to be sorted in the first place. --- distributed/scheduler.py | 40 ++++++++++++++++++---------------------- distributed/stealing.py | 2 +- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 19dad48bb64..9f0d602a125 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -26,7 +26,6 @@ Iterable, Iterator, Mapping, - Sequence, Set, ) from contextlib import suppress @@ -1307,8 +1306,7 @@ class SchedulerState: #: Workers that are currently in running state running: set[WorkerState] #: Workers that are currently in running state and not fully utilized - #: (actually a SortedDict, but the sortedcontainers package isn't annotated) - idle: dict[str, WorkerState] + idle: set[WorkerState] #: Workers that are fully utilized. May include non-running workers. saturated: set[WorkerState] total_nthreads: int @@ -1408,7 +1406,7 @@ def __init__( self.clients["fire-and-forget"] = ClientState("fire-and-forget") self.extensions = {} self.host_info = host_info - self.idle = SortedDict() + self.idle = set() self.n_tasks = 0 self.resources = resources self.saturated = set() @@ -1847,7 +1845,7 @@ def decide_worker_rootish_queuing_disabled( # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` assert math.isinf(self.WORKER_SATURATION) - pool = self.idle.values() if self.idle else self.running + pool = self.idle or self.running if not pool: return None @@ -1922,7 +1920,7 @@ def decide_worker_from_family( return ws # No siblings are anywhere else (or no family at all). Pick the least busy worker. - ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) + ws = min(self.idle, key=lambda ws: len(ws.processing) / ws.nthreads) if self.validate: assert self.workers.get(ws.address) is ws assert not _worker_full(ws, self.WORKER_SATURATION), ( @@ -1973,13 +1971,11 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: # group is also smaller than the cluster. # Fastpath when there are no related tasks or restrictions - worker_pool = self.idle or self.workers - # FIXME idle and workers are SortedDict's declared as dicts - # because sortedcontainers is not annotated - wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) - n_workers: int = len(wp_vals) + # FIXME making a list here is silly, but so is this whole code path + worker_pool = list(self.idle or self.workers.values()) + n_workers: int = len(worker_pool) if n_workers < 20: # smart but linear in small case - ws = min(wp_vals, key=operator.attrgetter("occupancy")) + ws = min(worker_pool, key=operator.attrgetter("occupancy")) assert ws if ws.occupancy == 0: # special case to use round-robin; linear search @@ -1989,12 +1985,12 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: start: int = self.n_tasks % n_workers i: int for i in range(n_workers): - wp_i = wp_vals[(i + start) % n_workers] + wp_i = worker_pool[(i + start) % n_workers] if wp_i.occupancy == 0: ws = wp_i break else: # dumb but fast in large case - ws = wp_vals[self.n_tasks % n_workers] + ws = worker_pool[self.n_tasks % n_workers] if self.validate and ws is not None: assert self.workers.get(ws.address) is ws @@ -2867,10 +2863,10 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): else not _worker_full(ws, self.WORKER_SATURATION) ): if ws.status == Status.running: - idle[ws.address] = ws + idle.add(ws) saturated.discard(ws) else: - idle.pop(ws.address, None) + idle.discard(ws) if p > nc: pending: float = occ * (p - nc) / (p * nc) @@ -4584,7 +4580,7 @@ async def remove_worker( self.rpc.remove(address) del self.stream_comms[address] del self.aliases[ws.name] - self.idle.pop(ws.address, None) + self.idle.discard(ws) self.saturated.discard(ws) del self.workers[address] ws.status = Status.closed @@ -4860,21 +4856,21 @@ def validate_state(self, allow_overlap: bool = False) -> None: if not (set(self.workers) == set(self.stream_comms)): raise ValueError("Workers not the same in all collections") - assert self.running.issuperset(self.idle.values()), ( + assert self.running.issuperset(self.idle), ( self.running, - list(self.idle.values()), + self.idle, ) for w, ws in self.workers.items(): assert isinstance(w, str), (type(w), w) assert isinstance(ws, WorkerState), (type(ws), ws) assert ws.address == w if ws.status != Status.running: - assert ws.address not in self.idle + assert ws not in self.idle assert ws.long_running.issubset(ws.processing) if not ws.processing: assert not ws.occupancy if ws.status == Status.running: - assert ws.address in self.idle + assert ws in self.idle assert (ws.status == Status.running) == (ws in self.running) for ws in self.running: @@ -5173,7 +5169,7 @@ def handle_worker_status_change( self.send_all(client_msgs, worker_msgs) else: self.running.discard(ws) - self.idle.pop(ws.address, None) + self.idle.discard(ws) async def handle_request_refresh_who_has( self, keys: Iterable[str], worker: str, stimulus_id: str diff --git a/distributed/stealing.py b/distributed/stealing.py index cdbcce30c4e..49b30405696 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -400,7 +400,7 @@ def balance(self) -> None: with log_errors(): i = 0 # Paused and closing workers must never become thieves - potential_thieves = set(s.idle.values()) + potential_thieves = s.idle.copy() if not potential_thieves or len(potential_thieves) == len(s.workers): return victim: WorkerState | None From 3a0329ea713e81099caeb80957ee16c46afcaf13 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 4 Oct 2022 19:37:53 -0600 Subject: [PATCH 24/24] include maxsize in initial check --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9f0d602a125..c66f82d898d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8336,7 +8336,7 @@ def family( # TODO potentially could be useful to distinguish between 'too big' # and empty family---we might want to schedule them differently. # That is, maybe `None` and `(set(), set())` might not be synonymous. - if not ts.dependents or len(ts.dependents) > widely_shared_cutoff: + if not ts.dependents or len(ts.dependents) > min(widely_shared_cutoff, maxsize): return None siblings: set[TaskState] = set()