Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4834,7 +4834,7 @@ def register_plugin(
self,
plugin: NannyPlugin | SchedulerPlugin | WorkerPlugin,
name: str | None = None,
idempotent: bool = False,
idempotent: bool | None = None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the kwarg frankly feels a bit like an anti pattern. I think the plugin should know whether it is idempotent or not.

This is also a question about precedence. If the plugin has this attribute set, does it make sense for a user to overwrite it?

):
"""Register a plugin.

Expand All @@ -4849,11 +4849,21 @@ def register_plugin(
plugin instance or automatically generated if not present.
idempotent :
Do not re-register if a plugin of the given name already exists.
If None, ``plugin.idempotent`` is taken if defined, False otherwise.
"""
if name is None:
name = _get_plugin_name(plugin)
assert name

if idempotent is not None:
warnings.warn(
"The `idempotent` argument is deprecated and will be removed in a "
"future version. Please mark your plugin as idempotent by setting its "
"`.idempotent` atrribute to `True`.",
FutureWarning,
)
else:
idempotent = getattr(plugin, "idempotent", False)
assert isinstance(idempotent, bool)
return self._register_plugin(plugin, name, idempotent)

@singledispatchmethod
Expand Down
30 changes: 27 additions & 3 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
class SchedulerPlugin:
"""Interface to extend the Scheduler

A plugin enables custom hooks to run when specific events occur. The scheduler will run the methods of this plugin whenever the corresponding
A plugin enables custom hooks to run when specific events occur.
The scheduler will run the methods of this plugin whenever the corresponding
method of the scheduler is run. This runs user code within the scheduler
thread that can perform arbitrary operations in synchrony with the scheduler
itself.
Expand All @@ -46,6 +47,11 @@ class SchedulerPlugin:
2. override some of its methods
3. register the plugin using :meth:`Client.register_plugin<distributed.Client.register_plugin>`.

The ``idempotent`` attribute is used to control whether or not the plugin should
be ignored upon registration if a scheduler plugin with the same name already exists.
If ``True``, the plugin is ignored, otherwise the existing plugin is replaced.
Defaults to ``False``.

Examples
--------
>>> class Counter(SchedulerPlugin):
Expand All @@ -63,6 +69,8 @@ class SchedulerPlugin:
>>> scheduler.add_plugin(plugin) # doctest: +SKIP
"""

idempotent: bool = False

async def start(self, scheduler: Scheduler) -> None:
"""Run when the scheduler starts up

Expand Down Expand Up @@ -217,6 +225,11 @@ class WorkerPlugin:
2. override some of its methods
3. register the plugin using :meth:`Client.register_plugin<distributed.Client.register_plugin>`.

The ``idempotent`` attribute is used to control whether or not the plugin should
be ignored upon registration if a worker plugin with the same name already exists.
If ``True``, the plugin is ignored, otherwise the existing plugin is replaced.
Defaults to ``False``.

Examples
--------
>>> class ErrorLogger(WorkerPlugin):
Expand All @@ -240,6 +253,8 @@ class WorkerPlugin:
>>> client.register_plugin(plugin) # doctest: +SKIP
"""

idempotent: bool = False

def setup(self, worker: Worker) -> None | Awaitable[None]:
"""
Run when the plugin is attached to a worker. This happens when the plugin is registered
Expand Down Expand Up @@ -298,6 +313,11 @@ class NannyPlugin:
2. override some of its methods
3. register the plugin using :meth:`Client.register_plugin<distributed.Client.register_plugin>`.

The ``idempotent`` attribute is used to control whether or not the plugin should
be ignored upon registration if a nanny plugin with the same name already exists.
If ``True``, the plugin is ignored, otherwise the existing plugin is replaced.
Defaults to ``False``.

The ``restart`` attribute is used to control whether or not a running ``Worker``
needs to be restarted when registering the plugin.

Expand All @@ -307,7 +327,8 @@ class NannyPlugin:
SchedulerPlugin
"""

restart = False
idempotent: bool = False
restart: bool = False

def setup(self, nanny):
"""
Expand Down Expand Up @@ -413,7 +434,10 @@ async def start(self, scheduler: Scheduler) -> None:
self.installer, self.restart_workers, self.name
)
await scheduler.register_worker_plugin(
comm=None, plugin=dumps(worker_plugin), name=self.name
comm=None,
plugin=dumps(worker_plugin),
name=self.name,
idempotent=True,
)
else:
logger.info(
Expand Down
70 changes: 64 additions & 6 deletions distributed/diagnostics/tests/test_nanny_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from distributed import Nanny, NannyPlugin
from distributed.protocol.pickle import dumps
from distributed.utils_test import gen_cluster


Expand Down Expand Up @@ -58,11 +59,12 @@ def teardown(self, nanny):


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_idempotent_plugins(c, s, a):
async def test_register_idempotent_plugin(c, s, a):
class IdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "idempotentplugin"
self.instance = instance
self.idempotent = True

def setup(self, nanny):
if self.instance != "first":
Expand All @@ -71,27 +73,83 @@ def setup(self, nanny):
)

first = IdempotentPlugin(instance="first")
await c.register_plugin(first, idempotent=True)
await c.register_plugin(first)
assert "idempotentplugin" in a.plugins

second = IdempotentPlugin(instance="second")
await c.register_plugin(second, idempotent=True)
await c.register_plugin(second)
assert "idempotentplugin" in a.plugins
assert a.plugins["idempotentplugin"].instance == "first"


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_non_idempotent_plugins(c, s, a):
async def test_register_non_idempotent_plugin(c, s, a):
class NonIdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "nonidempotentplugin"
self.instance = instance

first = NonIdempotentPlugin(instance="first")
await c.register_plugin(first, idempotent=False)
await c.register_plugin(first)
assert "nonidempotentplugin" in a.plugins

second = NonIdempotentPlugin(instance="second")
await c.register_plugin(second, idempotent=False)
await c.register_plugin(second)
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "second"

third = NonIdempotentPlugin(instance="third")
with pytest.warns(
FutureWarning,
match="`Scheduler.register_nanny_plugin` now requires `idempotent`",
):
await s.register_nanny_plugin(
comm=None, plugin=dumps(third), name="nonidempotentplugin"
)
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "third"


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a):
class NonIdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "nonidempotentplugin"
self.instance = instance
# We want to overrule this
self.idempotent = True

first = NonIdempotentPlugin(instance="first")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(first, idempotent=False)
assert "nonidempotentplugin" in a.plugins

second = NonIdempotentPlugin(instance="second")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(second, idempotent=False)
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "second"

class IdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "idempotentplugin"
self.instance = instance
# We want to overrule this
self.idempotent = False

def setup(self, nanny):
if self.instance != "first":
raise RuntimeError(
"Only the first plugin should be started when idempotent is set"
)

first = IdempotentPlugin(instance="first")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(first, idempotent=True)
assert "idempotentplugin" in a.plugins

second = IdempotentPlugin(instance="second")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(second, idempotent=True)
assert "idempotentplugin" in a.plugins
assert a.plugins["idempotentplugin"].instance == "first"
Loading