diff --git a/distributed/client.py b/distributed/client.py index ae9d90e9e84..9d03ad85bc8 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4834,7 +4834,7 @@ def register_plugin( self, plugin: NannyPlugin | SchedulerPlugin | WorkerPlugin, name: str | None = None, - idempotent: bool = False, + idempotent: bool | None = None, ): """Register a plugin. @@ -4849,11 +4849,21 @@ def register_plugin( plugin instance or automatically generated if not present. idempotent : Do not re-register if a plugin of the given name already exists. + If None, ``plugin.idempotent`` is taken if defined, False otherwise. """ if name is None: name = _get_plugin_name(plugin) assert name - + if idempotent is not None: + warnings.warn( + "The `idempotent` argument is deprecated and will be removed in a " + "future version. Please mark your plugin as idempotent by setting its " + "`.idempotent` atrribute to `True`.", + FutureWarning, + ) + else: + idempotent = getattr(plugin, "idempotent", False) + assert isinstance(idempotent, bool) return self._register_plugin(plugin, name, idempotent) @singledispatchmethod diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index e58d4deec77..3f21d969dd0 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -32,7 +32,8 @@ class SchedulerPlugin: """Interface to extend the Scheduler - A plugin enables custom hooks to run when specific events occur. The scheduler will run the methods of this plugin whenever the corresponding + A plugin enables custom hooks to run when specific events occur. + The scheduler will run the methods of this plugin whenever the corresponding method of the scheduler is run. This runs user code within the scheduler thread that can perform arbitrary operations in synchrony with the scheduler itself. @@ -46,6 +47,11 @@ class SchedulerPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + be ignored upon registration if a scheduler plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. + Defaults to ``False``. + Examples -------- >>> class Counter(SchedulerPlugin): @@ -63,6 +69,8 @@ class SchedulerPlugin: >>> scheduler.add_plugin(plugin) # doctest: +SKIP """ + idempotent: bool = False + async def start(self, scheduler: Scheduler) -> None: """Run when the scheduler starts up @@ -217,6 +225,11 @@ class WorkerPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + be ignored upon registration if a worker plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. + Defaults to ``False``. + Examples -------- >>> class ErrorLogger(WorkerPlugin): @@ -240,6 +253,8 @@ class WorkerPlugin: >>> client.register_plugin(plugin) # doctest: +SKIP """ + idempotent: bool = False + def setup(self, worker: Worker) -> None | Awaitable[None]: """ Run when the plugin is attached to a worker. This happens when the plugin is registered @@ -298,6 +313,11 @@ class NannyPlugin: 2. override some of its methods 3. register the plugin using :meth:`Client.register_plugin`. + The ``idempotent`` attribute is used to control whether or not the plugin should + be ignored upon registration if a nanny plugin with the same name already exists. + If ``True``, the plugin is ignored, otherwise the existing plugin is replaced. + Defaults to ``False``. + The ``restart`` attribute is used to control whether or not a running ``Worker`` needs to be restarted when registering the plugin. @@ -307,7 +327,8 @@ class NannyPlugin: SchedulerPlugin """ - restart = False + idempotent: bool = False + restart: bool = False def setup(self, nanny): """ @@ -413,7 +434,10 @@ async def start(self, scheduler: Scheduler) -> None: self.installer, self.restart_workers, self.name ) await scheduler.register_worker_plugin( - comm=None, plugin=dumps(worker_plugin), name=self.name + comm=None, + plugin=dumps(worker_plugin), + name=self.name, + idempotent=True, ) else: logger.info( diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index cded39d2cd0..de31fbd1f7a 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -3,6 +3,7 @@ import pytest from distributed import Nanny, NannyPlugin +from distributed.protocol.pickle import dumps from distributed.utils_test import gen_cluster @@ -58,11 +59,12 @@ def teardown(self, nanny): @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_idempotent_plugins(c, s, a): +async def test_register_idempotent_plugin(c, s, a): class IdempotentPlugin(NannyPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" self.instance = instance + self.idempotent = True def setup(self, nanny): if self.instance != "first": @@ -71,27 +73,83 @@ def setup(self, nanny): ) first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) + await c.register_plugin(first) assert "idempotentplugin" in a.plugins second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) + await c.register_plugin(second) assert "idempotentplugin" in a.plugins assert a.plugins["idempotentplugin"].instance == "first" @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_non_idempotent_plugins(c, s, a): +async def test_register_non_idempotent_plugin(c, s, a): class NonIdempotentPlugin(NannyPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" self.instance = instance first = NonIdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=False) + await c.register_plugin(first) assert "nonidempotentplugin" in a.plugins second = NonIdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=False) + await c.register_plugin(second) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`Scheduler.register_nanny_plugin` now requires `idempotent`", + ): + await s.register_nanny_plugin( + comm=None, plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): + class NonIdempotentPlugin(NannyPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = True + + first = NonIdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) + assert "nonidempotentplugin" in a.plugins + + second = NonIdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(NannyPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def setup(self, nanny): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 49c37149caf..52e43ecfeca 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -426,7 +426,7 @@ def __init__(self): self.name = "plugin" plugin = Plugin() - await s.register_scheduler_plugin(plugin=dumps(plugin)) + await s.register_scheduler_plugin(plugin=dumps(plugin), idempotent=False) assert "plugin" in s.plugins await s.unregister_scheduler_plugin(name="plugin") @@ -478,7 +478,7 @@ class MyPlugin(SchedulerPlugin): async def start(self, scheduler: Scheduler) -> None: scheduler._foo = "bar" # type: ignore - await s.register_scheduler_plugin(MyPlugin()) + await s.register_scheduler_plugin(MyPlugin(), idempotent=False) assert s._foo == "bar" @@ -499,8 +499,8 @@ async def before_close(self): async def close(self): raise Exception("AFTER_CLOSE") - await s.register_scheduler_plugin(OK()) - await s.register_scheduler_plugin(Bad()) + await s.register_scheduler_plugin(OK(), idempotent=False) + await s.register_scheduler_plugin(Bad(), idempotent=False) with captured_logger("distributed.scheduler") as logger: await s.close() @@ -641,3 +641,149 @@ def stop(self, scheduler): await c.register_worker_plugin(DuckPlugin(), nanny=True) assert len(s.plugins) == n_existing_plugins + 1 assert s.foo == 123 + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_idempotent_plugin(c, s): + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + self.idempotent = True + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_non_idempotent_plugin(c, s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await c.register_plugin(first) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await c.register_plugin(second) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", + ): + await s.register_scheduler_plugin( + plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[]) +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = True + + first = NonIdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_register_idempotent_plugins_directly(s): + class IdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + + def start(self, scheduler): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) + assert "idempotentplugin" in s.plugins + + second = IdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) + assert "idempotentplugin" in s.plugins + assert s.plugins["idempotentplugin"].instance == "first" + + +@gen_cluster(nthreads=[]) +async def test_register_non_idempotent_plugins_directly(s): + class NonIdempotentPlugin(SchedulerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + + first = NonIdempotentPlugin(instance="first") + await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) + assert "nonidempotentplugin" in s.plugins + + second = NonIdempotentPlugin(instance="second") + await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", + ): + await s.register_scheduler_plugin(plugin=dumps(third)) + assert "nonidempotentplugin" in s.plugins + assert s.plugins["nonidempotentplugin"].instance == "third" diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 64fffd42498..f0435578a71 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -6,6 +6,7 @@ import pytest from distributed import Worker, WorkerPlugin +from distributed.protocol.pickle import dumps from distributed.utils_test import async_poll_for, gen_cluster, inc @@ -322,11 +323,12 @@ def teardown(self, worker): @gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_idempotent_plugins(c, s, a): +async def test_register_idempotent_plugin(c, s, a): class IdempotentPlugin(WorkerPlugin): def __init__(self, instance=None): self.name = "idempotentplugin" self.instance = instance + self.idempotent = True def setup(self, worker): if self.instance != "first": @@ -335,27 +337,83 @@ def setup(self, worker): ) first = IdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=True) + await c.register_plugin(first) assert "idempotentplugin" in a.plugins second = IdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=True) + await c.register_plugin(second) assert "idempotentplugin" in a.plugins assert a.plugins["idempotentplugin"].instance == "first" @gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_non_idempotent_plugins(c, s, a): +async def test_register_non_idempotent_plugin(c, s, a): class NonIdempotentPlugin(WorkerPlugin): def __init__(self, instance=None): self.name = "nonidempotentplugin" self.instance = instance first = NonIdempotentPlugin(instance="first") - await c.register_plugin(first, idempotent=False) + await c.register_plugin(first) assert "nonidempotentplugin" in a.plugins second = NonIdempotentPlugin(instance="second") - await c.register_plugin(second, idempotent=False) + await c.register_plugin(second) assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" + + third = NonIdempotentPlugin(instance="third") + with pytest.warns( + FutureWarning, + match="`Scheduler.register_worker_plugin` now requires `idempotent`", + ): + await s.register_worker_plugin( + comm=None, plugin=dumps(third), name="nonidempotentplugin" + ) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "third" + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): + class NonIdempotentPlugin(WorkerPlugin): + def __init__(self, instance=None): + self.name = "nonidempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = True + + first = NonIdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=False) + assert "nonidempotentplugin" in a.plugins + + second = NonIdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=False) + assert "nonidempotentplugin" in a.plugins + assert a.plugins["nonidempotentplugin"].instance == "second" + + class IdempotentPlugin(WorkerPlugin): + def __init__(self, instance=None): + self.name = "idempotentplugin" + self.instance = instance + # We want to overrule this + self.idempotent = False + + def setup(self, worker): + if self.instance != "first": + raise RuntimeError( + "Only the first plugin should be started when idempotent is set" + ) + + first = IdempotentPlugin(instance="first") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(first, idempotent=True) + assert "idempotentplugin" in a.plugins + + second = IdempotentPlugin(instance="second") + with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): + await c.register_plugin(second, idempotent=True) + assert "idempotentplugin" in a.plugins + assert a.plugins["idempotentplugin"].instance == "first" diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 6c1e08ea513..04da9bc1d11 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5824,7 +5824,7 @@ async def register_scheduler_plugin( self, plugin: bytes | SchedulerPlugin, name: str | None = None, - idempotent: bool = False, + idempotent: bool | None = None, ) -> None: """Register a plugin on the scheduler.""" if not dask.config.get("distributed.scheduler.pickle"): @@ -5834,6 +5834,14 @@ async def register_scheduler_plugin( "arbitrary bytestrings using pickle via the " "'distributed.scheduler.pickle' configuration setting." ) + if idempotent is None: + warnings.warn( + "The signature of `Scheduler.register_scheduler_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False if not isinstance(plugin, SchedulerPlugin): plugin = loads(plugin) assert isinstance(plugin, SchedulerPlugin) @@ -7539,10 +7547,18 @@ def stop_task_metadata(self, name: str | None = None) -> dict: return {"metadata": plugin.metadata, "state": plugin.state} async def register_worker_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool = False + self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None ) -> dict[str, OKMessage]: """Registers a worker plugin on all running and future workers""" logger.info("Registering Worker plugin %s", name) + if idempotent is None: + warnings.warn( + "The signature of `Scheduler.register_worker_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False if name in self.worker_plugins and idempotent: return {} @@ -7566,10 +7582,20 @@ async def unregister_worker_plugin( return responses async def register_nanny_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool = False + self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None ) -> dict[str, OKMessage]: """Registers a nanny plugin on all running and future nannies""" logger.info("Registering Nanny plugin %s", name) + + if idempotent is None: + warnings.warn( + "The signature of `Scheduler.register_nanny_plugin` now requires " + "`idempotent`. Not including `idempotent` in the signature will no longer " + "be supported in future versions.", + FutureWarning, + ) + idempotent = False + if name in self.nanny_plugins and idempotent: return {} diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index ea3864d9134..2617abe65b7 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -69,7 +69,7 @@ def __init__(self, scheduler: Scheduler): async def start(self, scheduler: Scheduler) -> None: worker_plugin = ShuffleWorkerPlugin() await self.scheduler.register_worker_plugin( - None, dumps(worker_plugin), name="shuffle" + None, dumps(worker_plugin), name="shuffle", idempotent=False ) def shuffle_ids(self) -> set[ShuffleId]: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 8ff3ab092e6..df42b88cc7b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4176,46 +4176,6 @@ async def test_dump_cluster_state(s, *workers, format): fs.rm("state-dumps", recursive=True) -@gen_cluster(nthreads=[]) -async def test_idempotent_plugins(s): - class IdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - - def start(self, scheduler): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - await s.register_scheduler_plugin(plugin=dumps(first), idempotent=True) - assert "idempotentplugin" in s.plugins - - second = IdempotentPlugin(instance="second") - await s.register_scheduler_plugin(plugin=dumps(second), idempotent=True) - assert "idempotentplugin" in s.plugins - assert s.plugins["idempotentplugin"].instance == "first" - - -@gen_cluster(nthreads=[]) -async def test_non_idempotent_plugins(s): - class NonIdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "nonidempotentplugin" - self.instance = instance - - first = NonIdempotentPlugin(instance="first") - await s.register_scheduler_plugin(plugin=dumps(first), idempotent=False) - assert "nonidempotentplugin" in s.plugins - - second = NonIdempotentPlugin(instance="second") - await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) - assert "nonidempotentplugin" in s.plugins - assert s.plugins["nonidempotentplugin"].instance == "second" - - @gen_cluster(nthreads=[("", 1)]) async def test_repr(s, a): async with Worker(s.address, nthreads=2) as b: # name = address by default