From b2f7846151ef81367eee2269298c9295c2e42ec1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Nov 2022 10:07:35 -0700 Subject: [PATCH 1/9] Consistent worker selection for all no-deps cases --- distributed/scheduler.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index eb5828bf716..e1baa0f7fc1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2097,7 +2097,7 @@ def decide_worker_rootish_queuing_disabled( else: # Last-used worker is full, unknown, retiring, or paused; # pick a new worker for the next few tasks - ws = min(pool, key=partial(self.worker_objective, ts)) + ws = min(pool, key=self.worker_objective_no_deps) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) @@ -2152,7 +2152,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # Just pick the least busy worker. # NOTE: this will lead to worst-case scheduling with regards to co-assignment. - ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) + ws = min(self.idle.values(), key=self.worker_objective_no_deps) if self.validate: assert not _worker_full(ws, self.WORKER_SATURATION), ( ws, @@ -2212,18 +2212,17 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) n_workers: int = len(wp_vals) if n_workers < 20: # smart but linear in small case - ws = min(wp_vals, key=operator.attrgetter("occupancy")) + ws = min(wp_vals, key=self.worker_objective_no_deps) assert ws - if ws.occupancy == 0: + if sum(self.worker_objective_no_deps(ws)) == 0: # special case to use round-robin; linear search - # for next worker with zero occupancy (or just - # land back where we started). + # for next empty worker (or just land back where we started). wp_i: WorkerState start: int = self.n_tasks % n_workers i: int for i in range(n_workers): wp_i = wp_vals[(i + start) % n_workers] - if wp_i.occupancy == 0: + if sum(self.worker_objective_no_deps(wp_i)) == 0: ws = wp_i break else: # dumb but fast in large case @@ -3235,6 +3234,17 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: else: return (start_time, ws.nbytes) + def worker_objective_no_deps(self, ws: WorkerState) -> tuple[float, float]: + """ + Objective function to determine the least-busy worker. + + Meant for use with tasks where dependencies are not relevant for scheduling + (no dependencies, or widely-shared). + + Minimize worker occupancy. If a tie then break with data storage. + """ + return (ws.occupancy / ws.nthreads, ws.nbytes / ws.memory_limit or 1) + def add_replica(self, ts: TaskState, ws: WorkerState): """Note that a worker holds a replica of a task with state='memory'""" ws.add_replica(ts) From 4d08596fbb69efdd86150c787f21a8c0a8c4d4a2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Nov 2022 16:59:21 -0700 Subject: [PATCH 2/9] fix zero-division --- distributed/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e1baa0f7fc1..7c3f7c237c3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3243,7 +3243,7 @@ def worker_objective_no_deps(self, ws: WorkerState) -> tuple[float, float]: Minimize worker occupancy. If a tie then break with data storage. """ - return (ws.occupancy / ws.nthreads, ws.nbytes / ws.memory_limit or 1) + return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1)) def add_replica(self, ts: TaskState, ws: WorkerState): """Note that a worker holds a replica of a task with state='memory'""" From d876fee9677941110f7615dee8bd44d7d175d2ac Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Nov 2022 17:49:35 -0700 Subject: [PATCH 3/9] fix `test_retries` --- distributed/tests/test_client_executor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 1019c2d59f8..dc543dc304c 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -216,21 +216,25 @@ def test_retries(client): with client.get_executor(retries=5, pure=False) as e: future = e.submit(varying(args)) assert future.result() == 42 + del future with client.get_executor(retries=4) as e: future = e.submit(varying(args)) result = future.result() assert result == 42 + del future with client.get_executor(retries=2) as e: future = e.submit(varying(args)) with pytest.raises(ZeroDivisionError, match="two"): res = future.result() + del future with client.get_executor(retries=0) as e: future = e.submit(varying(args)) with pytest.raises(ZeroDivisionError, match="one"): res = future.result() + del future def test_shutdown_wait(client): From 5f873b8e52b20c84919568324e32f2215d95a8f5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Nov 2022 18:21:08 -0700 Subject: [PATCH 4/9] fix `test_decide_worker_rootish_while_last_worker_is_retiring` --- distributed/tests/test_scheduler.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 48ad99db771..db39a2e872d 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -276,6 +276,11 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): evy = Event() async with BlockedGatherDep(s.address, nthreads=1) as b: + # Take up some memory on the new worker, so it's the less appealing option for + # `worker_objective_no_deps` (occupancy of both should be the same). + ballast = c.submit(lambda: "x" * 4096, workers=[b.address], key="ballast") + await wait(ballast) + xs = [ c.submit(lambda ev: ev.wait(), evx[0], key="x-0", workers=[a.address]), c.submit(lambda ev: ev.wait(), evx[1], key="x-1", workers=[a.address]), @@ -302,8 +307,12 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): await evx[0].set() await wait_for_state("y-0", "processing", s) await wait_for_state("y-1", "processing", s) - assert s.tasks["y-2"].group.last_worker == s.workers[a.address] - assert s.tasks["y-2"].group.last_worker_tasks_left == 1 + if (last_worker := s.tasks["y-2"].group.last_worker) != s.workers[a.address]: + pytest.fail( + f"Test assumptions have changed: {a=} not selected, instead {last_worker}." + ) + if (tasks_left := s.tasks["y-2"].group.last_worker_tasks_left) != 1: + pytest.fail(f"Test assumptions have changed: {tasks_left=}.") # Take a out of the running pool, but without removing it from the cluster # completely From fa1af5cbb24f3744b6ff713f9aba49b228bc277c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 9 Nov 2022 19:44:37 -0700 Subject: [PATCH 5/9] Add memory-tiebreaker tests from #7248 --- distributed/tests/test_scheduler.py | 58 ++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index db39a2e872d..c0a16575781 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -21,7 +21,7 @@ import dask from dask import delayed -from dask.utils import apply, parse_timedelta, stringify, tmpfile, typename +from dask.utils import apply, parse_bytes, parse_timedelta, stringify, tmpfile, typename from distributed import ( CancelledError, @@ -490,6 +490,62 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 3, +) +async def test_decide_worker_memory_tiebreaker_idle_cluster(c, s, *workers): + big = c.submit(lambda: "x" * 4096) + await big + f2 = c.submit(inc, 1) + await f2 + f3 = c.submit(inc, 2) + await f3 + + assert all(len(w.state.tasks) == 1 for w in workers), [ + w.state.tasks for w in workers + ] + + last = c.submit(inc, 3) + await last + + big_ts = s.tasks[big.key] + last_ts = s.tasks[last.key] + assert first(last_ts.who_has) is not first(big_ts.who_has) + + +@gen_cluster( + client=True, + nthreads=[], + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + }, +) +async def test_decide_worker_memory_tiebreaker_idle_heterogeneous_cluster(c, s): + async with Worker(s.address, nthreads=2, memory_limit="500mib") as w_small: + async with Worker(s.address, nthreads=2, memory_limit="1000mib") as w_large: + f200 = c.submit( + lambda: "x" * parse_bytes("200mib"), + workers=[w_small.address], + key="f200", + ) + f300 = c.submit( + lambda: "x" * parse_bytes("300mib"), + workers=[w_large.address], + key="f300", + ) + await wait([f200, f300]) + + # `w_large` has more absolute memory use, but a lower percentage memory use. + # We should pick it over `w_small` since it's under less memory pressure. + f = c.submit(inc, 1) + await f + + assert f.key in w_large.state.tasks + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event() From 2256f899c0de4971308413df4ddbd53ab6c7b937 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 10 Nov 2022 11:18:27 -0700 Subject: [PATCH 6/9] `ignore_deps` --- distributed/scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 62fa2707c3a..d2aec1cfeb9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2102,7 +2102,7 @@ def decide_worker_rootish_queuing_disabled( else: # Last-used worker is full, unknown, retiring, or paused; # pick a new worker for the next few tasks - ws = min(pool, key=self.worker_objective_no_deps) + ws = min(pool, key=self.worker_objective_ignore_deps) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) @@ -2157,7 +2157,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # Just pick the least busy worker. # NOTE: this will lead to worst-case scheduling with regards to co-assignment. - ws = min(self.idle_task_count, key=self.worker_objective_no_deps) + ws = min(self.idle_task_count, key=self.worker_objective_ignore_deps) if self.validate: assert not _worker_full(ws, self.WORKER_SATURATION), ( ws, @@ -2217,9 +2217,9 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) n_workers: int = len(wp_vals) if n_workers < 20: # smart but linear in small case - ws = min(wp_vals, key=self.worker_objective_no_deps) + ws = min(wp_vals, key=self.worker_objective_ignore_deps) assert ws - if sum(self.worker_objective_no_deps(ws)) == 0: + if sum(self.worker_objective_ignore_deps(ws)) == 0: # special case to use round-robin; linear search # for next empty worker (or just land back where we started). wp_i: WorkerState @@ -2227,7 +2227,7 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: i: int for i in range(n_workers): wp_i = wp_vals[(i + start) % n_workers] - if sum(self.worker_objective_no_deps(wp_i)) == 0: + if sum(self.worker_objective_ignore_deps(wp_i)) == 0: ws = wp_i break else: # dumb but fast in large case @@ -3237,7 +3237,7 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: else: return (start_time, ws.nbytes) - def worker_objective_no_deps(self, ws: WorkerState) -> tuple[float, float]: + def worker_objective_ignore_deps(self, ws: WorkerState) -> tuple[float, float]: """ Objective function to determine the least-busy worker. From f42a0508b2a794dcb96aefcf5a8c23e42eba872c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 10 Nov 2022 11:32:35 -0700 Subject: [PATCH 7/9] update docstring --- distributed/scheduler.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d2aec1cfeb9..d8399819ee7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3239,10 +3239,16 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: def worker_objective_ignore_deps(self, ws: WorkerState) -> tuple[float, float]: """ - Objective function to determine the least-busy worker. - - Meant for use with tasks where dependencies are not relevant for scheduling - (no dependencies, or widely-shared). + Objective function for tasks where dependencies are not relevant for scheduling. + + Meant for use with tasks with no dependencies, or more importantly, with + widely-shared dependencies. These are tasks where `is_rootish` is True. + `is_rootish` implies that the tasks in the group will be assigned to every + worker in cluster. Therefore, any dependencies they have will be copied to every + worker in the cluster. To prevent edge cases, the objective function matches + this assumption. Otherwise, worker selection might be imbalanced or "dogpile" + towards towards the dependencies. This can take up capacity on the worker that + might be better used for downstream tasks. Minimize worker occupancy. If a tie then break with data storage. """ From 99c5ae45ef8deb2c0c3aa347672e1614751e78d8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Nov 2022 18:49:30 -0700 Subject: [PATCH 8/9] Update distributed/tests/test_client_executor.py Co-authored-by: crusaderky --- distributed/tests/test_client_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index dc543dc304c..93f1ec659a0 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -216,6 +216,7 @@ def test_retries(client): with client.get_executor(retries=5, pure=False) as e: future = e.submit(varying(args)) assert future.result() == 42 + # Ensure that the task is released on the scheduler before the next update-graph del future with client.get_executor(retries=4) as e: From e4f62f923f314084e58cdd28997dd9c3e6167c61 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Nov 2022 18:49:48 -0700 Subject: [PATCH 9/9] absolute memory usage for now --- distributed/scheduler.py | 2 +- distributed/tests/test_scheduler.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d8399819ee7..866c25d5bb4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3252,7 +3252,7 @@ def worker_objective_ignore_deps(self, ws: WorkerState) -> tuple[float, float]: Minimize worker occupancy. If a tie then break with data storage. """ - return (ws.occupancy / ws.nthreads, ws.nbytes / (ws.memory_limit or 1)) + return (ws.occupancy / ws.nthreads, ws.nbytes) def add_replica(self, ts: TaskState, ws: WorkerState): """Note that a worker holds a replica of a task with state='memory'""" diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 969aa477c8c..b32683ec1bc 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -517,6 +517,9 @@ async def test_decide_worker_memory_tiebreaker_idle_cluster(c, s, *workers): assert first(last_ts.who_has) is not first(big_ts.who_has) +@pytest.mark.skip( + "Not yet implemented: https://github.com/dask/distributed/issues/7266" +) @gen_cluster( client=True, nthreads=[],