diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b495332979a..866c25d5bb4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2102,7 +2102,7 @@ def decide_worker_rootish_queuing_disabled( else: # Last-used worker is full, unknown, retiring, or paused; # pick a new worker for the next few tasks - ws = min(pool, key=partial(self.worker_objective, ts)) + ws = min(pool, key=self.worker_objective_ignore_deps) tg.last_worker_tasks_left = math.floor( (len(tg) / self.total_nthreads) * ws.nthreads ) @@ -2157,10 +2157,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # Just pick the least busy worker. # NOTE: this will lead to worst-case scheduling with regards to co-assignment. - ws = min( - self.idle_task_count, - key=lambda ws: len(ws.processing) / ws.nthreads, - ) + ws = min(self.idle_task_count, key=self.worker_objective_ignore_deps) if self.validate: assert not _worker_full(ws, self.WORKER_SATURATION), ( ws, @@ -2220,18 +2217,17 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) n_workers: int = len(wp_vals) if n_workers < 20: # smart but linear in small case - ws = min(wp_vals, key=operator.attrgetter("occupancy")) + ws = min(wp_vals, key=self.worker_objective_ignore_deps) assert ws - if ws.occupancy == 0: + if sum(self.worker_objective_ignore_deps(ws)) == 0: # special case to use round-robin; linear search - # for next worker with zero occupancy (or just - # land back where we started). + # for next empty worker (or just land back where we started). wp_i: WorkerState start: int = self.n_tasks % n_workers i: int for i in range(n_workers): wp_i = wp_vals[(i + start) % n_workers] - if wp_i.occupancy == 0: + if sum(self.worker_objective_ignore_deps(wp_i)) == 0: ws = wp_i break else: # dumb but fast in large case @@ -3241,6 +3237,23 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple: else: return (start_time, ws.nbytes) + def worker_objective_ignore_deps(self, ws: WorkerState) -> tuple[float, float]: + """ + Objective function for tasks where dependencies are not relevant for scheduling. + + Meant for use with tasks with no dependencies, or more importantly, with + widely-shared dependencies. These are tasks where `is_rootish` is True. + `is_rootish` implies that the tasks in the group will be assigned to every + worker in cluster. Therefore, any dependencies they have will be copied to every + worker in the cluster. To prevent edge cases, the objective function matches + this assumption. Otherwise, worker selection might be imbalanced or "dogpile" + towards towards the dependencies. This can take up capacity on the worker that + might be better used for downstream tasks. + + Minimize worker occupancy. If a tie then break with data storage. + """ + return (ws.occupancy / ws.nthreads, ws.nbytes) + def add_replica(self, ts: TaskState, ws: WorkerState): """Note that a worker holds a replica of a task with state='memory'""" ws.add_replica(ts) diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py index 1019c2d59f8..93f1ec659a0 100644 --- a/distributed/tests/test_client_executor.py +++ b/distributed/tests/test_client_executor.py @@ -216,21 +216,26 @@ def test_retries(client): with client.get_executor(retries=5, pure=False) as e: future = e.submit(varying(args)) assert future.result() == 42 + # Ensure that the task is released on the scheduler before the next update-graph + del future with client.get_executor(retries=4) as e: future = e.submit(varying(args)) result = future.result() assert result == 42 + del future with client.get_executor(retries=2) as e: future = e.submit(varying(args)) with pytest.raises(ZeroDivisionError, match="two"): res = future.result() + del future with client.get_executor(retries=0) as e: future = e.submit(varying(args)) with pytest.raises(ZeroDivisionError, match="one"): res = future.result() + del future def test_shutdown_wait(client): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e5028f8746a..b32683ec1bc 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -21,7 +21,7 @@ import dask from dask import delayed -from dask.utils import apply, parse_timedelta, stringify, tmpfile, typename +from dask.utils import apply, parse_bytes, parse_timedelta, stringify, tmpfile, typename from distributed import ( CancelledError, @@ -276,6 +276,11 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): evy = Event() async with BlockedGatherDep(s.address, nthreads=1) as b: + # Take up some memory on the new worker, so it's the less appealing option for + # `worker_objective_no_deps` (occupancy of both should be the same). + ballast = c.submit(lambda: "x" * 4096, workers=[b.address], key="ballast") + await wait(ballast) + xs = [ c.submit(lambda ev: ev.wait(), evx[0], key="x-0", workers=[a.address]), c.submit(lambda ev: ev.wait(), evx[1], key="x-1", workers=[a.address]), @@ -302,8 +307,12 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): await evx[0].set() await wait_for_state("y-0", "processing", s) await wait_for_state("y-1", "processing", s) - assert s.tasks["y-2"].group.last_worker == s.workers[a.address] - assert s.tasks["y-2"].group.last_worker_tasks_left == 1 + if (last_worker := s.tasks["y-2"].group.last_worker) != s.workers[a.address]: + pytest.fail( + f"Test assumptions have changed: {a=} not selected, instead {last_worker}." + ) + if (tasks_left := s.tasks["y-2"].group.last_worker_tasks_left) != 1: + pytest.fail(f"Test assumptions have changed: {tasks_left=}.") # Take a out of the running pool, but without removing it from the cluster # completely @@ -484,6 +493,65 @@ async def test_queued_remove_add_worker(c, s, a, b): await wait(fs) +@gen_cluster( + client=True, + nthreads=[("", 2)] * 3, +) +async def test_decide_worker_memory_tiebreaker_idle_cluster(c, s, *workers): + big = c.submit(lambda: "x" * 4096) + await big + f2 = c.submit(inc, 1) + await f2 + f3 = c.submit(inc, 2) + await f3 + + assert all(len(w.state.tasks) == 1 for w in workers), [ + w.state.tasks for w in workers + ] + + last = c.submit(inc, 3) + await last + + big_ts = s.tasks[big.key] + last_ts = s.tasks[last.key] + assert first(last_ts.who_has) is not first(big_ts.who_has) + + +@pytest.mark.skip( + "Not yet implemented: https://github.com/dask/distributed/issues/7266" +) +@gen_cluster( + client=True, + nthreads=[], + config={ + "distributed.worker.memory.pause": False, + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": False, + }, +) +async def test_decide_worker_memory_tiebreaker_idle_heterogeneous_cluster(c, s): + async with Worker(s.address, nthreads=2, memory_limit="500mib") as w_small: + async with Worker(s.address, nthreads=2, memory_limit="1000mib") as w_large: + f200 = c.submit( + lambda: "x" * parse_bytes("200mib"), + workers=[w_small.address], + key="f200", + ) + f300 = c.submit( + lambda: "x" * parse_bytes("300mib"), + workers=[w_large.address], + key="f300", + ) + await wait([f200, f300]) + + # `w_large` has more absolute memory use, but a lower percentage memory use. + # We should pick it over `w_small` since it's under less memory pressure. + f = c.submit(inc, 1) + await f + + assert f.key in w_large.state.tasks + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_secede_opens_slot(c, s, a): first = Event()