From d44999a6820cbed5b5da76caae698d54e1cf9d00 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 24 Aug 2021 16:37:38 -0400 Subject: [PATCH 1/9] use name= argument with Scheduler.remove_plugin (silences new warning) --- distributed/dashboard/components/scheduler.py | 2 +- distributed/diagnostics/eventstream.py | 2 +- distributed/diagnostics/tests/test_scheduler_plugin.py | 4 ++-- distributed/diagnostics/websocket.py | 3 +++ distributed/http/scheduler/info.py | 2 +- distributed/scheduler.py | 8 +++++--- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index c9dddf1ec56..1465c1ddf99 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -2113,7 +2113,7 @@ def patch_updates(self): self.edge_source.patch({"visible": updates}) def __del__(self): - self.scheduler.remove_plugin(self.layout) + self.scheduler.remove_plugin(name=self.layout.name) class TaskGroupGraph(DashboardComponent): diff --git a/distributed/diagnostics/eventstream.py b/distributed/diagnostics/eventstream.py index 1f4c096f7e0..4ace48282e4 100644 --- a/distributed/diagnostics/eventstream.py +++ b/distributed/diagnostics/eventstream.py @@ -29,7 +29,7 @@ def swap_buffer(scheduler, es): def teardown(scheduler, es): - scheduler.remove_plugin(es) + scheduler.remove_plugin(name=es.name) async def eventstream(address, interval): diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 95b74f252eb..caf5b34af93 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -67,7 +67,7 @@ def remove_worker(self, worker, scheduler): ] events[:] = [] - s.remove_plugin(plugin) + s.remove_plugin(name=plugin.name) a = await Worker(s.address) await a.close() assert events == [] @@ -104,7 +104,7 @@ async def remove_worker(self, worker, scheduler): } events[:] = [] - s.remove_plugin(plugin) + s.remove_plugin(name=plugin.name) async with Worker(s.address): pass assert events == [] diff --git a/distributed/diagnostics/websocket.py b/distributed/diagnostics/websocket.py index 3796f776036..84aed343606 100644 --- a/distributed/diagnostics/websocket.py +++ b/distributed/diagnostics/websocket.py @@ -4,6 +4,9 @@ class WebsocketPlugin(SchedulerPlugin): + + name = "websocket" + def __init__(self, socket, scheduler): self.socket = socket self.scheduler = scheduler diff --git a/distributed/http/scheduler/info.py b/distributed/http/scheduler/info.py index 44197141e9e..096180e1956 100644 --- a/distributed/http/scheduler/info.py +++ b/distributed/http/scheduler/info.py @@ -204,7 +204,7 @@ def on_message(self, message): self.send("pong", {"timestamp": str(datetime.now())}) def on_close(self): - self.server.remove_plugin(self.plugin) + self.server.remove_plugin(name=self.plugin.name) routes = [ diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f89e486356d..a4011338c1f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7037,7 +7037,7 @@ def stop_task_metadata(self, comm=None, name=None): ) plugin = plugins[0] - self.remove_plugin(plugin) + self.remove_plugin(name=plugin.name) return {"metadata": plugin.metadata, "state": plugin.state} async def register_worker_plugin(self, comm, plugin, name=None): @@ -8150,6 +8150,8 @@ class WorkerStatusPlugin(SchedulerPlugin): scheduler. """ + name = "worker-status" + def __init__(self, scheduler, comm): self.bcomm = BatchedSend(interval="5ms") self.bcomm.start(comm) @@ -8164,13 +8166,13 @@ def add_worker(self, worker=None, **kwargs): try: self.bcomm.send(["add", {"workers": {worker: ident}}]) except CommClosedError: - self.scheduler.remove_plugin(self) + self.scheduler.remove_plugin(name=self.name) def remove_worker(self, worker=None, **kwargs): try: self.bcomm.send(["remove", worker]) except CommClosedError: - self.scheduler.remove_plugin(self) + self.scheduler.remove_plugin(name=self.name) def teardown(self): self.bcomm.close() From 848742175f4e48c503179d726c4eefec29cb59aa Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Aug 2021 11:42:00 -0400 Subject: [PATCH 2/9] fix remaining remove_plugin warnings --- distributed/diagnostics/progress.py | 2 ++ distributed/diagnostics/progress_stream.py | 5 +++-- distributed/diagnostics/tests/test_scheduler_plugin.py | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index abdfc2cf96b..22ac2cb5642 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -240,6 +240,8 @@ def format_time(t): class AllProgress(SchedulerPlugin): """Keep track of all keys, grouped by key_split""" + name = "all-progress" + def __init__(self, scheduler): self.all = defaultdict(set) self.nbytes = defaultdict(lambda: 0) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index 57b0cb38390..0d9f514cd37 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -1,4 +1,5 @@ import logging +from functools import partial from tlz import merge, valmap @@ -24,7 +25,7 @@ def counts(scheduler, allprogress): def remove_plugin(*args, **kwargs): # Wrapper function around `Scheduler.remove_plugin` to avoid raising a # `PicklingError` when using a cythonized scheduler - return Scheduler.remove_plugin(*args, **kwargs) + return Scheduler.remove_plugin(**kwargs) async def progress_stream(address, interval): @@ -53,7 +54,7 @@ async def progress_stream(address, interval): "setup": dumps_function(AllProgress), "function": dumps_function(counts), "interval": interval, - "teardown": dumps_function(remove_plugin), + "teardown": dumps_function(partial(remove_plugin, name=AllProgress.name)), } ) return comm diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index caf5b34af93..c51d684ae01 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -117,7 +117,8 @@ async def start(self, scheduler): s.add_plugin(plugin) s.add_plugin(plugin, name="another") with pytest.raises(ValueError) as excinfo: - s.remove_plugin(plugin) + with pytest.warns(UserWarning, match="Removing scheduler plugins by value"): + s.remove_plugin(plugin) msg = str(excinfo.value) assert "Multiple instances of" in msg From 642a011db783c67aa5eb5aab1af799d023dc6437 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Aug 2021 11:51:48 -0400 Subject: [PATCH 3/9] remove unused *args --- distributed/diagnostics/progress_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index 0d9f514cd37..a9c02846e14 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -22,7 +22,7 @@ def counts(scheduler, allprogress): ) -def remove_plugin(*args, **kwargs): +def remove_plugin(**kwargs): # Wrapper function around `Scheduler.remove_plugin` to avoid raising a # `PicklingError` when using a cythonized scheduler return Scheduler.remove_plugin(**kwargs) From 3a3b12884684fc70d19ec70bc0e0cc283a96207f Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Aug 2021 11:53:37 -0400 Subject: [PATCH 4/9] FutureWarning --- distributed/diagnostics/tests/test_scheduler_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index c51d684ae01..09a7c39fbb0 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -117,7 +117,7 @@ async def start(self, scheduler): s.add_plugin(plugin) s.add_plugin(plugin, name="another") with pytest.raises(ValueError) as excinfo: - with pytest.warns(UserWarning, match="Removing scheduler plugins by value"): + with pytest.warns(FutureWarning, match="Removing scheduler plugins by value"): s.remove_plugin(plugin) msg = str(excinfo.value) From 060ac98730f0e33f6d255730dc59e237b0f79c1b Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Aug 2021 14:08:46 -0400 Subject: [PATCH 5/9] swap order --- distributed/diagnostics/tests/test_scheduler_plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 09a7c39fbb0..f895f2ae85f 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -116,8 +116,8 @@ async def start(self, scheduler): plugin = UnnamedPlugin() s.add_plugin(plugin) s.add_plugin(plugin, name="another") - with pytest.raises(ValueError) as excinfo: - with pytest.warns(FutureWarning, match="Removing scheduler plugins by value"): + with pytest.warns(FutureWarning, match="Removing scheduler plugins by value"): + with pytest.raises(ValueError) as excinfo: s.remove_plugin(plugin) msg = str(excinfo.value) From 2afcfa53d9ca69470c9d51531cc7a50855c5eca0 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Aug 2021 15:38:00 -0400 Subject: [PATCH 6/9] fix existing plugin warning in TaskStreamPlugin --- distributed/diagnostics/task_stream.py | 1 - distributed/diagnostics/tests/test_task_stream.py | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/diagnostics/task_stream.py b/distributed/diagnostics/task_stream.py index 120be52a38e..60dda656faa 100644 --- a/distributed/diagnostics/task_stream.py +++ b/distributed/diagnostics/task_stream.py @@ -27,7 +27,6 @@ def __init__(self, scheduler, maxlen=None): ) self.buffer = deque(maxlen=maxlen) self.scheduler = scheduler - scheduler.add_plugin(self) self.index = 0 def transition(self, key, start, finish, *args, **kwargs): diff --git a/distributed/diagnostics/tests/test_task_stream.py b/distributed/diagnostics/tests/test_task_stream.py index c65fdb5b573..3407ec1a59a 100644 --- a/distributed/diagnostics/tests/test_task_stream.py +++ b/distributed/diagnostics/tests/test_task_stream.py @@ -14,6 +14,7 @@ @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_TaskStreamPlugin(c, s, *workers): es = TaskStreamPlugin(s) + s.add_plugin(es) assert not es.buffer futures = c.map(div, [1] * 10, range(10)) @@ -46,6 +47,7 @@ async def test_TaskStreamPlugin(c, s, *workers): @gen_cluster(client=True) async def test_maxlen(c, s, a, b): tasks = TaskStreamPlugin(s, maxlen=5) + s.add_plugin(tasks) futures = c.map(inc, range(10)) await wait(futures) assert len(tasks.buffer) == 5 @@ -54,6 +56,7 @@ async def test_maxlen(c, s, a, b): @gen_cluster(client=True) async def test_collect(c, s, a, b): tasks = TaskStreamPlugin(s) + s.add_plugin(tasks) start = time() futures = c.map(slowinc, range(10), delay=0.1) await wait(futures) From 916643f212d5cea87ab879ff0f78988953737b90 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 3 Sep 2021 11:41:30 -0500 Subject: [PATCH 7/9] Graph layout plugin fixup --- distributed/dashboard/components/scheduler.py | 6 +++++- distributed/diagnostics/graph_layout.py | 2 -- distributed/diagnostics/tests/test_graph_layout.py | 6 ++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 1465c1ddf99..a6a5db56d1a 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1942,7 +1942,11 @@ class TaskGraph(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler - self.layout = GraphLayout(scheduler) + if GraphLayout.name in scheduler.plugins: + self.layout = scheduler.plugins[GraphLayout.name] + else: + self.layout = GraphLayout(scheduler) + scheduler.add_plugin(self.layout) self.invisible_count = 0 # number of invisible nodes self.node_source = ColumnDataSource( diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index 10f5dd8058a..bfcd2424b2c 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -29,8 +29,6 @@ def __init__(self, scheduler): self.visible_updates = [] self.visible_edge_updates = [] - scheduler.add_plugin(self) - if self.scheduler.tasks: dependencies = { k: [ds.key for ds in ts.dependencies] diff --git a/distributed/diagnostics/tests/test_graph_layout.py b/distributed/diagnostics/tests/test_graph_layout.py index 494cd26e3ca..97b76667df2 100644 --- a/distributed/diagnostics/tests/test_graph_layout.py +++ b/distributed/diagnostics/tests/test_graph_layout.py @@ -9,6 +9,7 @@ @gen_cluster(client=True) async def test_basic(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(5)) total = c.submit(sum, futures) @@ -28,6 +29,7 @@ async def test_construct_after_call(c, s, a, b): await total gl = GraphLayout(s) + s.add_plugin(gl) assert len(gl.x) == len(gl.y) == 6 assert all(gl.x[f.key] == 0 for f in futures) @@ -38,6 +40,7 @@ async def test_construct_after_call(c, s, a, b): @gen_cluster(client=True) async def test_states(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) await c.submit(sum, c.map(inc, range(5))) while True: @@ -50,6 +53,7 @@ async def test_states(c, s, a, b): @gen_cluster(client=True) async def test_release_tasks(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(5)) total = c.submit(sum, futures) @@ -66,6 +70,7 @@ async def test_release_tasks(c, s, a, b): @gen_cluster(client=True) async def test_forget(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) futures = c.map(inc, range(10)) futures = c.map(inc, futures) @@ -84,6 +89,7 @@ async def test_forget(c, s, a, b): @gen_cluster(client=True) async def test_unique_positions(c, s, a, b): gl = GraphLayout(s) + s.add_plugin(gl) x = c.submit(inc, 1) ys = [c.submit(operator.add, x, i) for i in range(5)] From 2b74ed369b2a6cc101d3f104beabb25c368dc363 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 3 Sep 2021 11:53:12 -0500 Subject: [PATCH 8/9] Use unique names for task progress plugins --- distributed/diagnostics/progress.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index 22ac2cb5642..4fd6a989fdc 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -5,6 +5,7 @@ from tlz import groupby, valmap +from dask.base import tokenize from dask.utils import stringify from ..utils import key_split @@ -60,8 +61,8 @@ class Progress(SchedulerPlugin): notably TextProgressBar and ProgressWidget, which do perform visualization. """ - def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False): - self.name = "Progress" + def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False, name=None): + self.name = name or f"progress-{tokenize(keys, minimum, dt, complete)}" self.keys = {k.key if hasattr(k, "key") else k for k in keys} self.keys = {stringify(k) for k in self.keys} self.scheduler = scheduler @@ -160,7 +161,10 @@ def __init__( self, keys, scheduler=None, func=key_split, minimum=0, dt=0.1, complete=False ): self.func = func - super().__init__(keys, scheduler, minimum=minimum, dt=dt, complete=complete) + name = f"multi-progress-{tokenize(keys, func, minimum, dt, complete)}" + super().__init__( + keys, scheduler, minimum=minimum, dt=dt, complete=complete, name=name + ) async def setup(self): keys = self.keys From 6374c452cca097530789dda4d98ed750e9b265ac Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 3 Sep 2021 12:52:42 -0500 Subject: [PATCH 9/9] Use unique names for GraphLayout plugin --- distributed/dashboard/components/scheduler.py | 7 ++----- distributed/diagnostics/graph_layout.py | 5 +++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index a6a5db56d1a..8501257e9fb 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1942,11 +1942,8 @@ class TaskGraph(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler - if GraphLayout.name in scheduler.plugins: - self.layout = scheduler.plugins[GraphLayout.name] - else: - self.layout = GraphLayout(scheduler) - scheduler.add_plugin(self.layout) + self.layout = GraphLayout(scheduler) + scheduler.add_plugin(self.layout) self.invisible_count = 0 # number of invisible nodes self.node_source = ColumnDataSource( diff --git a/distributed/diagnostics/graph_layout.py b/distributed/diagnostics/graph_layout.py index bfcd2424b2c..5e48f939085 100644 --- a/distributed/diagnostics/graph_layout.py +++ b/distributed/diagnostics/graph_layout.py @@ -1,3 +1,5 @@ +import uuid + from .plugin import SchedulerPlugin @@ -11,9 +13,8 @@ class GraphLayout(SchedulerPlugin): is rendered at /graph on the diagnostic dashboard. """ - name = "graph-layout" - def __init__(self, scheduler): + self.name = f"graph-layout-{uuid.uuid4()}" self.x = {} self.y = {} self.collision = {}