Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack
from jinja2 import Environment, FileSystemLoader
from tlz import curry, pipe, valmap
from tlz import curry, pipe, second, valmap

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this module are all cosmetic

from tlz.curried import concat, groupby, map
from tornado import escape

Expand Down Expand Up @@ -1523,42 +1523,44 @@ def __init__(self, scheduler, **kwargs):
def update(self):
compute_times = defaultdict(float)

for key, ts in self.scheduler.task_prefixes.items():
name = key_split(key)
for action, t in ts.all_durations.items():
for name, tp in self.scheduler.task_prefixes.items():
for action, t in tp.all_durations.items():
if action == "compute":
compute_times[name] += t

# order by largest time first
compute_times = sorted(compute_times.items(), key=lambda x: x[1], reverse=True)

# keep only time which are 2% of max or greater
if compute_times:
max_time = compute_times[0][1] * 0.02
compute_times = [(n, t) for n, t in compute_times if t > max_time]
compute_colors = list()
compute_names = list()
compute_time = list()
total_time = 0
for name, t in compute_times:
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t

angles = [t / total_time * 2 * math.pi for t in compute_time]

self.fig.x_range.factors = compute_names

compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)
if not compute_times:
return

update(self.compute_source, compute_result)
# order by largest time first
compute_times = sorted(compute_times.items(), key=second, reverse=True)

# Keep only times which are 2% of max or greater
max_time = compute_times[0][1] * 0.02
compute_colors = []
compute_names = []
compute_time = []
total_time = 0
for name, t in compute_times:
if t < max_time:
break
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t

angles = [t / total_time * 2 * math.pi for t in compute_time]

self.fig.x_range.factors = compute_names

compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)

update(self.compute_source, compute_result)


class AggregateAction(DashboardComponent):
Expand Down
11 changes: 7 additions & 4 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,22 +1286,25 @@ async def test_compute_per_key(c, s, a, b):

da = pytest.importorskip("dask.array")
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)

await x
y = await dask.delayed(inc)(1).persist()
z = (x + x.T) - x.mean(axis=0)
zsum = z.sum()
await c.compute(zsum)

mbk.update()

# Keep only times which are 2% of max or greater.
# This means that the list of names is not stable (but max time is always preserved)
assert mbk.compute_source.data["names"]
assert set(mbk.compute_source.data["names"]).issubset(s.task_prefixes)
assert "angles" in mbk.compute_source.data

http_client = AsyncHTTPClient()
response = await http_client.fetch(
"http://localhost:%d/individual-compute-time-per-key" % s.http_server.port
)
assert response.code == 200
assert ("sum-aggregate") in mbk.compute_source.data["names"]
assert ("add") in mbk.compute_source.data["names"]
assert "angles" in mbk.compute_source.data.keys()


@gen_cluster(scheduler_kwargs={"http_prefix": "foo-bar", "dashboard": True})
Expand Down