From 768796dc7028134cc629bddbc37acff8a0b03c68 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 8 Sep 2022 13:40:52 +0200 Subject: [PATCH 1/6] Remove declassification of fast keys in steal_time_ratio --- distributed/dashboard/components/scheduler.py | 4 +-- distributed/stealing.py | 6 ++-- distributed/tests/test_steal.py | 35 +++++++++++++++++++ 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index d89483472da..cd38f364c8f 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1783,7 +1783,7 @@ def __init__(self, scheduler, **kwargs): self.last = 0 self.source = ColumnDataSource( { - "time": [time() - 20, time()], + "time": [time() - 60, time()], "level": [0, 15], "color": ["white", "white"], "duration": [0, 0], @@ -1828,7 +1828,7 @@ def convert(self, msgs): """Convert a log message to a glyph""" total_duration = 0 for msg in msgs: - time, level, key, duration, sat, occ_sat, idl, occ_idl = msg + time, level, key, duration, sat, occ_sat, idl, occ_idl = msg[:8] total_duration += duration try: diff --git a/distributed/stealing.py b/distributed/stealing.py index 51cfd379a8c..3933e360048 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -237,18 +237,16 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non assert ts.processing_on ws = ts.processing_on compute_time = ws.processing[ts] - if compute_time < 0.005: # 5ms, just give up - return None, None nbytes = ts.get_nbytes_deps() transfer_time = nbytes / self.scheduler.bandwidth + LATENCY cost_multiplier = transfer_time / compute_time - if cost_multiplier > 100: - return None, None level = int(round(log2(cost_multiplier) + 6)) if level < 1: level = 1 + elif level > len(self.cost_multipliers): + return None, None return cost_multiplier, level diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 6f3874bc1b2..62e2244bb0a 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1350,3 +1350,38 @@ def test_steal_worker_state(ws_with_running_task): assert "x" not in ws.tasks assert "x" not in ws.data assert ws.available_resources == {"R": 1} + + +@pytest.mark.slow() +@gen_cluster(nthreads=[("", 1)] * 4, client=True) +async def test_steal_very_fast_tasks(c, s, *workers): + # Ensure that very fast tasks + root = dask.delayed(lambda n: "x" * n)( + dask.utils.parse_bytes("1MiB"), dask_key_name="root" + ) + + @dask.delayed + def func(*args): + import time + + time.sleep(0.002) + + ntasks = 1000 + results = [func(root, i) for i in range(ntasks)] + futs = c.compute(results) + await c.gather(futs) + + dat = {} + max_ = 0 + rest = 0 + for w in workers: + ntasks = len(w.data) + if ntasks > max_: + rest += max_ + max_ = ntasks + else: + rest += ntasks + dat[w] = len(w.data) + assert ntasks > ntasks / len(workers) * 0.5 + + assert max_ < rest * 2 / 3 From 35d16d3a281ef7930e272326fdcabcd550858dbb Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 8 Sep 2022 14:48:24 +0200 Subject: [PATCH 2/6] Clean up test --- distributed/tests/test_steal.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 62e2244bb0a..40c615c8ef0 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1355,7 +1355,7 @@ def test_steal_worker_state(ws_with_running_task): @pytest.mark.slow() @gen_cluster(nthreads=[("", 1)] * 4, client=True) async def test_steal_very_fast_tasks(c, s, *workers): - # Ensure that very fast tasks + # Ensure that very fast tasks are allowed to be stolen root = dask.delayed(lambda n: "x" * n)( dask.utils.parse_bytes("1MiB"), dask_key_name="root" ) @@ -1371,7 +1371,6 @@ def func(*args): futs = c.compute(results) await c.gather(futs) - dat = {} max_ = 0 rest = 0 for w in workers: @@ -1381,7 +1380,5 @@ def func(*args): max_ = ntasks else: rest += ntasks - dat[w] = len(w.data) - assert ntasks > ntasks / len(workers) * 0.5 assert max_ < rest * 2 / 3 From 80216bab5e66a161fd1552af4156bf5e226b4a2b Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 8 Sep 2022 15:00:33 +0200 Subject: [PATCH 3/6] Handle case for long running tasks --- distributed/stealing.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index 3933e360048..f1539c886e6 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -238,6 +238,12 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non ws = ts.processing_on compute_time = ws.processing[ts] + if not compute_time: + # occupancy/ws.proccessing[ts] is only allowed to be zero for + # long running tasks which cannot be stolen + assert ts in ws.long_running + return None, None + nbytes = ts.get_nbytes_deps() transfer_time = nbytes / self.scheduler.bandwidth + LATENCY cost_multiplier = transfer_time / compute_time @@ -245,7 +251,7 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non level = int(round(log2(cost_multiplier) + 6)) if level < 1: level = 1 - elif level > len(self.cost_multipliers): + elif level >= len(self.cost_multipliers): return None, None return cost_multiplier, level From 56c87e92b3bb69240b7d8c7e0971e0b975bb962b Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 8 Sep 2022 15:06:38 +0200 Subject: [PATCH 4/6] Rewrite test --- distributed/tests/test_steal.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 40c615c8ef0..155d5238141 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1371,14 +1371,15 @@ def func(*args): futs = c.compute(results) await c.gather(futs) - max_ = 0 - rest = 0 + # The following is an attempt to estimate and assert a roughly uniform data + # distribution + max_ntasks_on_worker = 0 for w in workers: - ntasks = len(w.data) - if ntasks > max_: - rest += max_ - max_ = ntasks - else: - rest += ntasks - - assert max_ < rest * 2 / 3 + ntasks_on_worker = len(w.data) + if ntasks_on_worker > max_ntasks_on_worker: + max_ntasks_on_worker = ntasks_on_worker + assert ntasks_on_worker > ntasks / len(workers) * 0.5 + + ntasks_on_other_workers = ntasks - max_ntasks_on_worker + nother_workers = len(workers) - 1 + assert max_ntasks_on_worker < 2 * ntasks_on_other_workers / nother_workers From 7429a82c4ae32b1e33b586c8faddcdb23b7b3827 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Fri, 9 Sep 2022 11:11:00 +0200 Subject: [PATCH 5/6] Update distributed/tests/test_steal.py Co-authored-by: Gabe Joseph --- distributed/tests/test_steal.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 155d5238141..ce97df9df2c 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1371,15 +1371,7 @@ def func(*args): futs = c.compute(results) await c.gather(futs) - # The following is an attempt to estimate and assert a roughly uniform data - # distribution - max_ntasks_on_worker = 0 - for w in workers: - ntasks_on_worker = len(w.data) - if ntasks_on_worker > max_ntasks_on_worker: - max_ntasks_on_worker = ntasks_on_worker - assert ntasks_on_worker > ntasks / len(workers) * 0.5 - - ntasks_on_other_workers = ntasks - max_ntasks_on_worker - nother_workers = len(workers) - 1 - assert max_ntasks_on_worker < 2 * ntasks_on_other_workers / nother_workers + ntasks_per_worker = np.array([len(w.data) for w in workers]) + ideal = ntasks / len(workers) + assert (ntasks_per_worker > ideal * 0.5).all(), (ideal, ntasks_per_worker) + assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker) From 84e3d530851a0e29603607e85ae6b0ff52863958 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 9 Sep 2022 11:11:42 +0200 Subject: [PATCH 6/6] Add numpy import --- distributed/tests/test_steal.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index ce97df9df2c..4a3751d6d91 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -10,6 +10,7 @@ from operator import mul from time import sleep +import numpy as np import pytest from tlz import sliding_window