From 0e98df187c09864426d40fa164bfc958bdd251fd Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 14 Nov 2023 15:58:04 +0100 Subject: [PATCH 1/5] Idempotent attribute --- distributed/client.py | 7 +- .../diagnostics/tests/test_nanny_plugin.py | 81 ++++++++- .../tests/test_scheduler_plugin.py | 165 ++++++++++++++++++ .../diagnostics/tests/test_worker_plugin.py | 81 ++++++++- distributed/scheduler.py | 32 +++- distributed/shuffle/_scheduler_plugin.py | 2 +- distributed/tests/test_scheduler.py | 40 ----- 7 files changed, 358 insertions(+), 50 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index ae9d90e9e84..0de68a47eab 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4834,7 +4834,7 @@ def register_plugin( self, plugin: NannyPlugin | SchedulerPlugin | WorkerPlugin, name: str | None = None, - idempotent: bool = False, + idempotent: bool | None = None, ): """Register a plugin. @@ -4849,11 +4849,14 @@ def register_plugin( plugin instance or automatically generated if not present. idempotent : Do not re-register if a plugin of the given name already exists. + If None, ``plugin.idempotent`` is taken if defined, False otherwise. """ if name is None: name = _get_plugin_name(plugin) assert name - + if idempotent is None: + idempotent = getattr(plugin, "idempotent", False) + assert isinstance(idempotent, bool) return self._register_plugin(plugin, name, idempotent) @singledispatchmethod diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index cded39d2cd0..da7bf2186aa 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -3,6 +3,7 @@ import pytest from distributed import Nanny, NannyPlugin +from distributed.protocol.pickle import dumps from distributed.utils_test import gen_cluster @@ -58,7 +59,31 @@ def teardown(self, nanny): @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_idempotent_plugins(c, s, a): +async def test_register_idempotent_plugin(c, s, a): + class IdempotentPlugin(NannyPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.idempotent = True + + def setup(self, nanny): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) +async def test_register_plugin_with_idempotent_keyword(c, s, a): class IdempotentPlugin(NannyPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" @@ -81,11 +106,41 @@ def setup(self, nanny): @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_non_idempotent_plugins(c, s, a): +async def test_register_non_idempotent_plugin(c, s, a): + class NonIdempotentPlugin(NannyPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "nonidempotentplugin" in a.plugins + + second = NonIdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`SchedulerPlugin.register_nanny_plugin` now requires `idempotent`", + ): + await s.register_nanny_plugin( + comm=None, plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) +async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s, a): class NonIdempotentPlugin(NannyPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" self.instance = instance + # We want to overrule this + self.idempotent = True first = NonIdempotentPlugin(instance="first") await c.register_plugin(first, idempotent=False) @@ -95,3 +150,25 @@ def __init__(self, instance=None): await c.register_plugin(second, idempotent=False) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(NannyPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def setup(self, nanny): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 49c37149caf..b1e6396b922 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -641,3 +641,168 @@ def stop(self, scheduler): await c.register_worker_plugin(DuckPlugin(), nanny=True) assert len(s.plugins) == n_existing_plugins + 1 assert s.foo == 123 + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_idempotent_plugin(c, s): + class IdempotentPlugin(Scheduler): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.idempotent = True + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_plugin_with_idempotent_keyword(c, s): + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_non_idempotent_plugin(c, s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`SchedulerPlugin.register_scheduler_plugin` now requires `idempotent`", + ): + await s.register_scheduler_plugin( + comm=None, plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = True + + first = NonIdempotentPlugin(instance="first") + await c.register_plugin(first, idempotent=False) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await c.register_plugin(second, idempotent=False) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_register_idempotent_plugins_directly(s): + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_register_non_idempotent_plugins_directly(s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", + ): + await s.register_scheduler_plugin(plugin=dumps(third)) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "third" diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 64fffd42498..de59447bc57 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -6,6 +6,7 @@ import pytest from distributed import Worker, WorkerPlugin +from distributed.protocol.pickle import dumps from distributed.utils_test import async_poll_for, gen_cluster, inc @@ -322,7 +323,31 @@ def teardown(self, worker): @gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_idempotent_plugins(c, s, a): +async def test_register_idempotent_plugin(c, s, a): + class IdempotentPlugin(WorkerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.idempotent = True + + def setup(self, worker): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_register_plugin_with_idempotent_keyword(c, s, a): class IdempotentPlugin(WorkerPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" @@ -345,12 +370,42 @@ def setup(self, worker): @gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_non_idempotent_plugins(c, s, a): +async def test_register_non_idempotent_plugin(c, s, a): class NonIdempotentPlugin(WorkerPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" self.instance = instance + first = NonIdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "nonidempotentplugin" in a.plugins + + second = NonIdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`SchedulerPlugin.register_worker_plugin` now requires `idempotent`", + ): + await s.register_worker_plugin( + comm=None, plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s, a): + class NonIdempotentPlugin(WorkerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = True + first = NonIdempotentPlugin(instance="first") await c.register_plugin(first, idempotent=False) assert "nonidempotentplugin" in a.plugins @@ -359,3 +414,25 @@ def __init__(self, instance=None): await c.register_plugin(second, idempotent=False) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(WorkerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def setup(self, worker): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6c1e08ea513..441e8b22d41 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5824,7 +5824,7 @@ async def register_scheduler_plugin( self, plugin: bytes | SchedulerPlugin, name: str | None = None, - idempotent: bool = False, + idempotent: bool | None = None, ) -> None: """Register a plugin on the scheduler.""" if not dask.config.get("distributed.scheduler.pickle"): @@ -5834,6 +5834,14 @@ async def register_scheduler_plugin( "arbitrary bytestrings using pickle via the " "'distributed.scheduler.pickle' configuration setting." ) + if idempotent is None: + warnings.warn( + "The signature of `SchedulerPlugin.register_scheduler_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False if not isinstance(plugin, SchedulerPlugin): plugin = loads(plugin) assert isinstance(plugin, SchedulerPlugin) @@ -7539,10 +7547,18 @@ def stop_task_metadata(self, name: str | None = None) -> dict: return {"metadata": plugin.metadata, "state": plugin.state} async def register_worker_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool = False + self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None ) -> dict[str, OKMessage]: """Registers a worker plugin on all running and future workers""" logger.info("Registering Worker plugin %s", name) + if idempotent is None: + warnings.warn( + "The signature of `SchedulerPlugin.register_worker_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False if name in self.worker_plugins and idempotent: return {} @@ -7566,10 +7582,20 @@ async def unregister_worker_plugin( return responses async def register_nanny_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool = False + self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None ) -> dict[str, OKMessage]: """Registers a nanny plugin on all running and future nannies""" logger.info("Registering Nanny plugin %s", name) + + if idempotent is None: + warnings.warn( + "The signature of `SchedulerPlugin.register_nanny_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False + if name in self.nanny_plugins and idempotent: return {} diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index ea3864d9134..2617abe65b7 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -69,7 +69,7 @@ def __init__(self, scheduler: Scheduler): async def start(self, scheduler: Scheduler) -> None: worker_plugin = ShuffleWorkerPlugin() await self.scheduler.register_worker_plugin( - None, dumps(worker_plugin), name="shuffle" + None, dumps(worker_plugin), name="shuffle", idempotent=False ) def shuffle_ids(self) -> set[ShuffleId]: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8ff3ab092e6..df42b88cc7b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4176,46 +4176,6 @@ async def test_dump_cluster_state(s, *workers, format): fs.rm("state-dumps", recursive=True) -@gen_cluster(nthreads=[]) -async def test_idempotent_plugins(s): - class IdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - - def start(self, scheduler): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) - assert "idempotentplugin" in s.plugins - - second = IdempotentPlugin(instance="second") - await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) - assert "idempotentplugin" in s.plugins - assert s.plugins["idempotentplugin"].instance == "first" - - -@gen_cluster(nthreads=[]) -async def test_non_idempotent_plugins(s): - class NonIdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "nonidempotentplugin" - self.instance = instance - - first = NonIdempotentPlugin(instance="first") - await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) - assert "nonidempotentplugin" in s.plugins - - second = NonIdempotentPlugin(instance="second") - await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) - assert "nonidempotentplugin" in s.plugins - assert s.plugins["nonidempotentplugin"].instance == "second" - - @gen_cluster(nthreads=[("", 1)]) async def test_repr(s, a): async with Worker(s.address, nthreads=2) as b: # name = address by default From 09a59624cb3baee225596b5d206c27d68d9dd657 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 14 Nov 2023 16:31:40 +0100 Subject: [PATCH 2/5] Fix --- distributed/diagnostics/plugin.py | 5 ++++- distributed/diagnostics/tests/test_nanny_plugin.py | 2 +- distributed/diagnostics/tests/test_scheduler_plugin.py | 10 +++++----- distributed/diagnostics/tests/test_worker_plugin.py | 2 +- distributed/scheduler.py | 6 +++--- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index e58d4deec77..77e5f1568d9 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -413,7 +413,10 @@ async def start(self, scheduler: Scheduler) -> None: self.installer, self.restart_workers, self.name ) await scheduler.register_worker_plugin( - comm=None, plugin=dumps(worker_plugin), name=self.name + comm=None, + plugin=dumps(worker_plugin), + name=self.name, + idempotent=True, ) else: logger.info( diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index da7bf2186aa..ac70f5fd150 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -124,7 +124,7 @@ def __init__(self, instance=None): third = NonIdempotentPlugin(instance="third") with pytest.warns( FutureWarning, - match="`SchedulerPlugin.register_nanny_plugin` now requires `idempotent`", + match="`Scheduler.register_nanny_plugin` now requires `idempotent`", ): await s.register_nanny_plugin( comm=None, plugin=dumps(third), name="nonidempotentplugin" diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index b1e6396b922..bb235dc064d 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -426,7 +426,7 @@ def __init__(self): self.name = "plugin" plugin = Plugin() - await s.register_scheduler_plugin(plugin=dumps(plugin)) + await s.register_scheduler_plugin(plugin=dumps(plugin), idempotent=False) assert "plugin" in s.plugins await s.unregister_scheduler_plugin(name="plugin") @@ -478,7 +478,7 @@ class MyPlugin(SchedulerPlugin): async def start(self, scheduler: Scheduler) -> None: scheduler._foo = "bar" # type: ignore - await s.register_scheduler_plugin(MyPlugin()) + await s.register_scheduler_plugin(MyPlugin(), idempotent=False) assert s._foo == "bar" @@ -499,8 +499,8 @@ async def before_close(self): async def close(self): raise Exception("AFTER_CLOSE") - await s.register_scheduler_plugin(OK()) - await s.register_scheduler_plugin(Bad()) + await s.register_scheduler_plugin(OK(), idempotent=False) + await s.register_scheduler_plugin(Bad(), idempotent=False) with captured_logger("distributed.scheduler") as logger: await s.close() @@ -645,7 +645,7 @@ def stop(self, scheduler): @gen_cluster(client=True, nthreads=[]) async def test_register_idempotent_plugin(c, s): - class IdempotentPlugin(Scheduler): + class IdempotentPlugin(SchedulerPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" self.instance = instance diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index de59447bc57..2bb1ac704b5 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -388,7 +388,7 @@ def __init__(self, instance=None): third = NonIdempotentPlugin(instance="third") with pytest.warns( FutureWarning, - match="`SchedulerPlugin.register_worker_plugin` now requires `idempotent`", + match="`Scheduler.register_worker_plugin` now requires `idempotent`", ): await s.register_worker_plugin( comm=None, plugin=dumps(third), name="nonidempotentplugin" diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 441e8b22d41..04da9bc1d11 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5836,7 +5836,7 @@ async def register_scheduler_plugin( ) if idempotent is None: warnings.warn( - "The signature of `SchedulerPlugin.register_scheduler_plugin` now requires " + "The signature of `Scheduler.register_scheduler_plugin` now requires " "`idempotent`. Not including `idempotent` in the signature will no longer " "be supported in future versions.", FutureWarning, @@ -7553,7 +7553,7 @@ async def register_worker_plugin( logger.info("Registering Worker plugin %s", name) if idempotent is None: warnings.warn( - "The signature of `SchedulerPlugin.register_worker_plugin` now requires " + "The signature of `Scheduler.register_worker_plugin` now requires " "`idempotent`. Not including `idempotent` in the signature will no longer " "be supported in future versions.", FutureWarning, @@ -7589,7 +7589,7 @@ async def register_nanny_plugin( if idempotent is None: warnings.warn( - "The signature of `SchedulerPlugin.register_nanny_plugin` now requires " + "The signature of `Scheduler.register_nanny_plugin` now requires " "`idempotent`. Not including `idempotent` in the signature will no longer " "be supported in future versions.", FutureWarning, From 99073c7b5c0652476c62d1e34dd5329dc82e637e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 14 Nov 2023 17:42:41 +0100 Subject: [PATCH 3/5] minor --- distributed/diagnostics/tests/test_scheduler_plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index bb235dc064d..9222ebb567a 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -709,10 +709,10 @@ def __init__(self, instance=None): third = NonIdempotentPlugin(instance="third") with pytest.warns( FutureWarning, - match="`SchedulerPlugin.register_scheduler_plugin` now requires `idempotent`", + match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", ): await s.register_scheduler_plugin( - comm=None, plugin=dumps(third), name="nonidempotentplugin" + plugin=dumps(third), name="nonidempotentplugin" ) assert "nonidempotentplugin" in s.plugins assert s.plugins["nonidempotentplugin"].instance == "third" From 4edfb40a4281b61cbe5b5d4fd3539d58ab9c4a33 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 15 Nov 2023 15:00:19 +0100 Subject: [PATCH 4/5] Review comments --- distributed/client.py | 9 ++++- distributed/diagnostics/plugin.py | 25 ++++++++++++- .../diagnostics/tests/test_nanny_plugin.py | 37 +++++-------------- .../tests/test_scheduler_plugin.py | 37 +++++-------------- .../diagnostics/tests/test_worker_plugin.py | 37 +++++-------------- 5 files changed, 58 insertions(+), 87 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 0de68a47eab..9d03ad85bc8 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4854,7 +4854,14 @@ def register_plugin( if name is None: name = _get_plugin_name(plugin) assert name - if idempotent is None: + if idempotent is not None: + warnings.warn( + "The `idempotent` argument is deprecated and will be removed in a " + "future version. Please mark your plugin as idempotent by setting its " + "`.idempotent` atrribute to `True`.", + FutureWarning, + ) + else: idempotent = getattr(plugin, "idempotent", False) assert isinstance(idempotent, bool) return self._register_plugin(plugin, name, idempotent) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 77e5f1568d9..259428fb2c6 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -32,7 +32,8 @@ class SchedulerPlugin: """Interface to extend the Scheduler - A plugin enables custom hooks to run when specific events occur. The scheduler will run the methods of this plugin whenever the corresponding + A plugin enables custom hooks to run when specific events occur. + The scheduler will run the methods of this plugin whenever the corresponding method of the scheduler is run. This runs user code within the scheduler thread that can perform arbitrary operations in synchrony with the scheduler itself. @@ -46,6 +47,11 @@ class SchedulerPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + skip installation if a scheduler plugin with the same name is already registered. + If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + Defaults to ``False``. + Examples -------- >>> class Counter(SchedulerPlugin): @@ -63,6 +69,8 @@ class SchedulerPlugin: >>> scheduler.add_plugin(plugin) # doctest: +SKIP """ + idempotent: bool = False + async def start(self, scheduler: Scheduler) -> None: """Run when the scheduler starts up @@ -217,6 +225,11 @@ class WorkerPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + skip installation if a worker plugin with the same name is already registered. + If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + Defaults to ``False``. + Examples -------- >>> class ErrorLogger(WorkerPlugin): @@ -240,6 +253,8 @@ class WorkerPlugin: >>> client.register_plugin(plugin) # doctest: +SKIP """ + idempotent: bool = False + def setup(self, worker: Worker) -> None | Awaitable[None]: """ Run when the plugin is attached to a worker. This happens when the plugin is registered @@ -298,6 +313,11 @@ class NannyPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + skip installation if a nanny plugin with the same name is already registered. + If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + Defaults to ``False``. + The ``restart`` attribute is used to control whether or not a running ``Worker`` needs to be restarted when registering the plugin. @@ -307,7 +327,8 @@ class NannyPlugin: SchedulerPlugin """ - restart = False + idempotent: bool = False + restart: bool = False def setup(self, nanny): """ diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index ac70f5fd150..de31fbd1f7a 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -82,29 +82,6 @@ def setup(self, nanny): assert a.plugins["idempotentplugin"].instance == "first" -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_plugin_with_idempotent_keyword(c, s, a): - class IdempotentPlugin(NannyPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - - def setup(self, nanny): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in a.plugins - - second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in a.plugins - assert a.plugins["idempotentplugin"].instance == "first" - - @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) async def test_register_non_idempotent_plugin(c, s, a): class NonIdempotentPlugin(NannyPlugin): @@ -134,7 +111,7 @@ def __init__(self, instance=None): @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s, a): +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): class NonIdempotentPlugin(NannyPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" @@ -143,11 +120,13 @@ def __init__(self, instance=None): self.idempotent = True first = NonIdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) assert "nonidempotentplugin" in a.plugins second = NonIdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" @@ -165,10 +144,12 @@ def setup(self, nanny): ) first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) assert "idempotentplugin" in a.plugins second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) assert "idempotentplugin" in a.plugins assert a.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 9222ebb567a..52e43ecfeca 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -667,29 +667,6 @@ def start(self, scheduler): assert s.plugins["idempotentplugin"].instance == "first" -@gen_cluster(client=True, nthreads=[]) -async def test_register_plugin_with_idempotent_keyword(c, s): - class IdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - - def start(self, scheduler): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in s.plugins - - second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in s.plugins - assert s.plugins["idempotentplugin"].instance == "first" - - @gen_cluster(client=True, nthreads=[]) async def test_register_non_idempotent_plugin(c, s): class NonIdempotentPlugin(SchedulerPlugin): @@ -719,7 +696,7 @@ def __init__(self, instance=None): @gen_cluster(client=True, nthreads=[]) -async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s): +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s): class NonIdempotentPlugin(SchedulerPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" @@ -728,11 +705,13 @@ def __init__(self, instance=None): self.idempotent = True first = NonIdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) assert "nonidempotentplugin" in s.plugins second = NonIdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) assert "nonidempotentplugin" in s.plugins assert s.plugins["nonidempotentplugin"].instance == "second" @@ -750,11 +729,13 @@ def start(self, scheduler): ) first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) assert "idempotentplugin" in s.plugins second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) assert "idempotentplugin" in s.plugins assert s.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 2bb1ac704b5..f0435578a71 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -346,29 +346,6 @@ def setup(self, worker): assert a.plugins["idempotentplugin"].instance == "first" -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_plugin_with_idempotent_keyword(c, s, a): - class IdempotentPlugin(WorkerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - - def setup(self, worker): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in a.plugins - - second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in a.plugins - assert a.plugins["idempotentplugin"].instance == "first" - - @gen_cluster(client=True, nthreads=[("", 1)]) async def test_register_non_idempotent_plugin(c, s, a): class NonIdempotentPlugin(WorkerPlugin): @@ -398,7 +375,7 @@ def __init__(self, instance=None): @gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_plugin_with_idempotent_keyword_overrules_plugin(c, s, a): +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): class NonIdempotentPlugin(WorkerPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" @@ -407,11 +384,13 @@ def __init__(self, instance=None): self.idempotent = True first = NonIdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) assert "nonidempotentplugin" in a.plugins second = NonIdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=False) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" @@ -429,10 +408,12 @@ def setup(self, worker): ) first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) assert "idempotentplugin" in a.plugins second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) assert "idempotentplugin" in a.plugins assert a.plugins["idempotentplugin"].instance == "first" From c90bbc7aa775301973484f617fae0c6834d09009 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 15 Nov 2023 15:07:22 +0100 Subject: [PATCH 5/5] Wording --- distributed/diagnostics/plugin.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 259428fb2c6..3f21d969dd0 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -48,8 +48,8 @@ class SchedulerPlugin: 3. register the plugin using :meth:`Client.register_plugin`. The ``idempotent`` attribute is used to control whether or not the plugin should - skip installation if a scheduler plugin with the same name is already registered. - If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + be ignored upon registration if a scheduler plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. Defaults to ``False``. Examples @@ -226,8 +226,8 @@ class WorkerPlugin: 3. register the plugin using :meth:`Client.register_plugin`. The ``idempotent`` attribute is used to control whether or not the plugin should - skip installation if a worker plugin with the same name is already registered. - If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + be ignored upon registration if a worker plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. Defaults to ``False``. Examples @@ -314,8 +314,8 @@ class NannyPlugin: 3. register the plugin using :meth:`Client.register_plugin`. The ``idempotent`` attribute is used to control whether or not the plugin should - skip installation if a nanny plugin with the same name is already registered. - If ``True``, the installation is skipped, otherwise the existing plugin is replaced. + be ignored upon registration if a nanny plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. Defaults to ``False``. The ``restart`` attribute is used to control whether or not a running ``Worker``