diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 1465c1ddf99..8501257e9fb 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1943,6 +1943,7 @@ class TaskGraph(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.layout = GraphLayout(scheduler) + scheduler.add_plugin(self.layout) self.invisible_count = 0 # number of invisible nodes self.node_source = ColumnDataSource( diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index 10f5dd8058a..5e48f939085 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -1,3 +1,5 @@ +import uuid + from .plugin import SchedulerPlugin @@ -11,9 +13,8 @@ class GraphLayout(SchedulerPlugin): is rendered at /graph on the diagnostic dashboard. """ - name = "graph-layout" - def __init__(self, scheduler): + self.name = f"graph-layout-{uuid.uuid4()}" self.x = {} self.y = {} self.collision = {} @@ -29,8 +30,6 @@ def __init__(self, scheduler): self.visible_updates = [] self.visible_edge_updates = [] - scheduler.add_plugin(self) - if self.scheduler.tasks: dependencies = { k: [ds.key for ds in ts.dependencies] diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index 22ac2cb5642..4fd6a989fdc 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -5,6 +5,7 @@ from tlz import groupby, valmap +from dask.base import tokenize from dask.utils import stringify from ..utils import key_split @@ -60,8 +61,8 @@ class Progress(SchedulerPlugin): notably TextProgressBar and ProgressWidget, which do perform visualization. """ - def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False): - self.name = "Progress" + def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False, name=None): + self.name = name or f"progress-{tokenize(keys, minimum, dt, complete)}" self.keys = {k.key if hasattr(k, "key") else k for k in keys} self.keys = {stringify(k) for k in self.keys} self.scheduler = scheduler @@ -160,7 +161,10 @@ def __init__( self, keys, scheduler=None, func=key_split, minimum=0, dt=0.1, complete=False ): self.func = func - super().__init__(keys, scheduler, minimum=minimum, dt=dt, complete=complete) + name = f"multi-progress-{tokenize(keys, func, minimum, dt, complete)}" + super().__init__( + keys, scheduler, minimum=minimum, dt=dt, complete=complete, name=name + ) async def setup(self): keys = self.keys diff --git a/distributed/diagnostics/task_stream.py b/distributed/diagnostics/task_stream.py index 120be52a38e..60dda656faa 100644 --- a/distributed/diagnostics/task_stream.py +++ b/distributed/diagnostics/task_stream.py @@ -27,7 +27,6 @@ def __init__(self, scheduler, maxlen=None): ) self.buffer = deque(maxlen=maxlen) self.scheduler = scheduler - scheduler.add_plugin(self) self.index = 0 def transition(self, key, start, finish, *args, **kwargs): diff --git a/distributed/diagnostics/tests/test_graph_layout.py b/distributed/diagnostics/tests/test_graph_layout.py index 494cd26e3ca..97b76667df2 100644 --- a/distributed/diagnostics/tests/test_graph_layout.py +++ b/distributed/diagnostics/tests/test_graph_layout.py @@ -9,6 +9,7 @@ @gen_cluster(client=True) async def test_basic(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(5)) total = c.submit(sum, futures) @@ -28,6 +29,7 @@ async def test_construct_after_call(c, s, a, b): await total gl = GraphLayout(s) + s.add_plugin(gl) assert len(gl.x) == len(gl.y) == 6 assert all(gl.x[f.key] == 0 for f in futures) @@ -38,6 +40,7 @@ async def test_construct_after_call(c, s, a, b): @gen_cluster(client=True) async def test_states(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) await c.submit(sum, c.map(inc, range(5))) while True: @@ -50,6 +53,7 @@ async def test_states(c, s, a, b): @gen_cluster(client=True) async def test_release_tasks(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(5)) total = c.submit(sum, futures) @@ -66,6 +70,7 @@ async def test_release_tasks(c, s, a, b): @gen_cluster(client=True) async def test_forget(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(10)) futures = c.map(inc, futures) @@ -84,6 +89,7 @@ async def test_forget(c, s, a, b): @gen_cluster(client=True) async def test_unique_positions(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) x = c.submit(inc, 1) ys = [c.submit(operator.add, x, i) for i in range(5)] diff --git a/distributed/diagnostics/tests/test_task_stream.py b/distributed/diagnostics/tests/test_task_stream.py index c65fdb5b573..3407ec1a59a 100644 --- a/distributed/diagnostics/tests/test_task_stream.py +++ b/distributed/diagnostics/tests/test_task_stream.py @@ -14,6 +14,7 @@ @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_TaskStreamPlugin(c, s, *workers): es = TaskStreamPlugin(s) + s.add_plugin(es) assert not es.buffer futures = c.map(div, [1] * 10, range(10)) @@ -46,6 +47,7 @@ async def test_TaskStreamPlugin(c, s, *workers): @gen_cluster(client=True) async def test_maxlen(c, s, a, b): tasks = TaskStreamPlugin(s, maxlen=5) + s.add_plugin(tasks) futures = c.map(inc, range(10)) await wait(futures) assert len(tasks.buffer) == 5 @@ -54,6 +56,7 @@ async def test_maxlen(c, s, a, b): @gen_cluster(client=True) async def test_collect(c, s, a, b): tasks = TaskStreamPlugin(s) + s.add_plugin(tasks) start = time() futures = c.map(slowinc, range(10), delay=0.1) await wait(futures)