From 2579927e25d61456b5211592a447209c79d16b58 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 22 Feb 2024 11:39:10 +0000 Subject: [PATCH] Fix flaky test_compute_per_key --- distributed/dashboard/components/scheduler.py | 68 ++++++++++--------- .../dashboard/tests/test_scheduler_bokeh.py | 11 +-- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index f3577bc526f..b9412168234 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -51,7 +51,7 @@ from bokeh.themes import Theme from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack from jinja2 import Environment, FileSystemLoader -from tlz import curry, pipe, valmap +from tlz import curry, pipe, second, valmap from tlz.curried import concat, groupby, map from tornado import escape @@ -1523,42 +1523,44 @@ def __init__(self, scheduler, **kwargs): def update(self): compute_times = defaultdict(float) - for key, ts in self.scheduler.task_prefixes.items(): - name = key_split(key) - for action, t in ts.all_durations.items(): + for name, tp in self.scheduler.task_prefixes.items(): + for action, t in tp.all_durations.items(): if action == "compute": compute_times[name] += t - # order by largest time first - compute_times = sorted(compute_times.items(), key=lambda x: x[1], reverse=True) - - # keep only time which are 2% of max or greater - if compute_times: - max_time = compute_times[0][1] * 0.02 - compute_times = [(n, t) for n, t in compute_times if t > max_time] - compute_colors = list() - compute_names = list() - compute_time = list() - total_time = 0 - for name, t in compute_times: - compute_names.append(name) - compute_colors.append(ts_color_of(name)) - compute_time.append(t) - total_time += t - - angles = [t / total_time * 2 * math.pi for t in compute_time] - - self.fig.x_range.factors = compute_names - - compute_result = dict( - angles=angles, - times=compute_time, - color=compute_colors, - names=compute_names, - formatted_time=[format_time(t) for t in compute_time], - ) + if not compute_times: + return - update(self.compute_source, compute_result) + # order by largest time first + compute_times = sorted(compute_times.items(), key=second, reverse=True) + + # Keep only times which are 2% of max or greater + max_time = compute_times[0][1] * 0.02 + compute_colors = [] + compute_names = [] + compute_time = [] + total_time = 0 + for name, t in compute_times: + if t < max_time: + break + compute_names.append(name) + compute_colors.append(ts_color_of(name)) + compute_time.append(t) + total_time += t + + angles = [t / total_time * 2 * math.pi for t in compute_time] + + self.fig.x_range.factors = compute_names + + compute_result = dict( + angles=angles, + times=compute_time, + color=compute_colors, + names=compute_names, + formatted_time=[format_time(t) for t in compute_time], + ) + + update(self.compute_source, compute_result) class AggregateAction(DashboardComponent): diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index ed61155d9d6..d0ddc21f901 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -1286,7 +1286,6 @@ async def test_compute_per_key(c, s, a, b): da = pytest.importorskip("dask.array") x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False) - await x y = await dask.delayed(inc)(1).persist() z = (x + x.T) - x.mean(axis=0) @@ -1294,14 +1293,18 @@ async def test_compute_per_key(c, s, a, b): await c.compute(zsum) mbk.update() + + # Keep only times which are 2% of max or greater. + # This means that the list of names is not stable (but max time is always preserved) + assert mbk.compute_source.data["names"] + assert set(mbk.compute_source.data["names"]).issubset(s.task_prefixes) + assert "angles" in mbk.compute_source.data + http_client = AsyncHTTPClient() response = await http_client.fetch( "http://localhost:%d/individual-compute-time-per-key" % s.http_server.port ) assert response.code == 200 - assert ("sum-aggregate") in mbk.compute_source.data["names"] - assert ("add") in mbk.compute_source.data["names"] - assert "angles" in mbk.compute_source.data.keys() @gen_cluster(scheduler_kwargs={"http_prefix": "foo-bar", "dashboard": True})