Skip to content
1 change: 1 addition & 0 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,7 @@ class TaskGraph(DashboardComponent):
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.layout = GraphLayout(scheduler)
scheduler.add_plugin(self.layout)
self.invisible_count = 0 # number of invisible nodes

self.node_source = ColumnDataSource(
Expand Down
7 changes: 3 additions & 4 deletions distributed/diagnostics/graph_layout.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import uuid

from .plugin import SchedulerPlugin


Expand All @@ -11,9 +13,8 @@ class GraphLayout(SchedulerPlugin):
is rendered at /graph on the diagnostic dashboard.
"""

name = "graph-layout"

def __init__(self, scheduler):
self.name = f"graph-layout-{uuid.uuid4()}"
self.x = {}
self.y = {}
self.collision = {}
Expand All @@ -29,8 +30,6 @@ def __init__(self, scheduler):
self.visible_updates = []
self.visible_edge_updates = []

scheduler.add_plugin(self)

if self.scheduler.tasks:
dependencies = {
k: [ds.key for ds in ts.dependencies]
Expand Down
10 changes: 7 additions & 3 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from tlz import groupby, valmap

from dask.base import tokenize
from dask.utils import stringify

from ..utils import key_split
Expand Down Expand Up @@ -60,8 +61,8 @@ class Progress(SchedulerPlugin):
notably TextProgressBar and ProgressWidget, which do perform visualization.
"""

def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False):
self.name = "Progress"
def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False, name=None):
self.name = name or f"progress-{tokenize(keys, minimum, dt, complete)}"
self.keys = {k.key if hasattr(k, "key") else k for k in keys}
self.keys = {stringify(k) for k in self.keys}
self.scheduler = scheduler
Expand Down Expand Up @@ -160,7 +161,10 @@ def __init__(
self, keys, scheduler=None, func=key_split, minimum=0, dt=0.1, complete=False
):
self.func = func
super().__init__(keys, scheduler, minimum=minimum, dt=dt, complete=complete)
name = f"multi-progress-{tokenize(keys, func, minimum, dt, complete)}"
super().__init__(
keys, scheduler, minimum=minimum, dt=dt, complete=complete, name=name
)

async def setup(self):
keys = self.keys
Expand Down
1 change: 0 additions & 1 deletion distributed/diagnostics/task_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def __init__(self, scheduler, maxlen=None):
)
self.buffer = deque(maxlen=maxlen)
self.scheduler = scheduler
scheduler.add_plugin(self)
self.index = 0

def transition(self, key, start, finish, *args, **kwargs):
Expand Down
6 changes: 6 additions & 0 deletions distributed/diagnostics/tests/test_graph_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@gen_cluster(client=True)
async def test_basic(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
futures = c.map(inc, range(5))
total = c.submit(sum, futures)

Expand All @@ -28,6 +29,7 @@ async def test_construct_after_call(c, s, a, b):
await total

gl = GraphLayout(s)
s.add_plugin(gl)

assert len(gl.x) == len(gl.y) == 6
assert all(gl.x[f.key] == 0 for f in futures)
Expand All @@ -38,6 +40,7 @@ async def test_construct_after_call(c, s, a, b):
@gen_cluster(client=True)
async def test_states(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
await c.submit(sum, c.map(inc, range(5)))

while True:
Expand All @@ -50,6 +53,7 @@ async def test_states(c, s, a, b):
@gen_cluster(client=True)
async def test_release_tasks(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)
futures = c.map(inc, range(5))
total = c.submit(sum, futures)

Expand All @@ -66,6 +70,7 @@ async def test_release_tasks(c, s, a, b):
@gen_cluster(client=True)
async def test_forget(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)

futures = c.map(inc, range(10))
futures = c.map(inc, futures)
Expand All @@ -84,6 +89,7 @@ async def test_forget(c, s, a, b):
@gen_cluster(client=True)
async def test_unique_positions(c, s, a, b):
gl = GraphLayout(s)
s.add_plugin(gl)

x = c.submit(inc, 1)
ys = [c.submit(operator.add, x, i) for i in range(5)]
Expand Down
3 changes: 3 additions & 0 deletions distributed/diagnostics/tests/test_task_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_TaskStreamPlugin(c, s, *workers):
es = TaskStreamPlugin(s)
s.add_plugin(es)
assert not es.buffer

futures = c.map(div, [1] * 10, range(10))
Expand Down Expand Up @@ -46,6 +47,7 @@ async def test_TaskStreamPlugin(c, s, *workers):
@gen_cluster(client=True)
async def test_maxlen(c, s, a, b):
tasks = TaskStreamPlugin(s, maxlen=5)
s.add_plugin(tasks)
futures = c.map(inc, range(10))
await wait(futures)
assert len(tasks.buffer) == 5
Expand All @@ -54,6 +56,7 @@ async def test_maxlen(c, s, a, b):
@gen_cluster(client=True)
async def test_collect(c, s, a, b):
tasks = TaskStreamPlugin(s)
s.add_plugin(tasks)
start = time()
futures = c.map(slowinc, range(10), delay=0.1)
await wait(futures)
Expand Down