From 4a391cbc6ba095f36a3913b6125fa3c692942c7b Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 30 May 2022 12:05:46 +0100 Subject: [PATCH 1/3] deprecate the io_loop and loop kwarg to Server, Worker, and Nanny passing anything other than IOLoop.current() is already unsupported: see https://github.com/dask/distributed/pull/6443#issuecomment-1139531109 --- distributed/core.py | 10 ++++-- .../diagnostics/tests/test_worker_plugin.py | 6 ++-- distributed/nanny.py | 14 ++++++--- distributed/scheduler.py | 3 +- distributed/tests/test_client.py | 2 +- distributed/tests/test_nanny.py | 12 +++---- distributed/tests/test_scheduler.py | 4 +-- distributed/tests/test_steal.py | 14 ++------- distributed/tests/test_worker.py | 31 ++++++++++++++----- distributed/utils_test.py | 4 +-- distributed/worker.py | 22 ++++++++----- 11 files changed, 73 insertions(+), 49 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 2a2a12521ff..f0bc6b219e2 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -162,6 +162,13 @@ def __init__( timeout=None, io_loop=None, ): + if io_loop is not None: + warnings.warn( + "The io_loop kwarg to Server is deprecated", + DeprecationWarning, + stacklevel=2, + ) + self._status = Status.init self.handlers = { "identity": self.identity, @@ -191,8 +198,7 @@ def __init__( self._event_finished = asyncio.Event() self.listeners = [] - self.io_loop = io_loop or IOLoop.current() - self.loop = self.io_loop + self.io_loop = self.loop = IOLoop.current() if not hasattr(self.io_loop, "profile"): ref = weakref.ref(self.io_loop) diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 62074de8ed2..202d0aee5d3 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -42,7 +42,7 @@ def transition(self, key, start, finish, **kwargs): async def test_create_with_client(c, s): await c.register_worker_plugin(MyPlugin(123)) - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) assert worker._my_plugin_status == "setup" assert worker._my_plugin_data == 123 @@ -55,7 +55,7 @@ async def test_remove_with_client(c, s): await c.register_worker_plugin(MyPlugin(123), name="foo") await c.register_worker_plugin(MyPlugin(546), name="bar") - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) # remove the 'foo' plugin await c.unregister_worker_plugin("foo") assert worker._my_plugin_status == "teardown" @@ -79,7 +79,7 @@ async def test_remove_with_client(c, s): async def test_remove_with_client_raises(c, s): await c.register_worker_plugin(MyPlugin(123), name="foo") - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) with pytest.raises(ValueError, match="bar"): await c.unregister_worker_plugin("bar") diff --git a/distributed/nanny.py b/distributed/nanny.py index f23b78f8e58..5f77f2cdcf8 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -134,8 +134,16 @@ def __init__( config=None, **worker_kwargs, ): + if loop is not None: + warnings.warn( + "the `loop` kwarg to `Nanny` is deprecated, and will be removed in a future release. " + "the Nanny always binds to the current loop.", + DeprecationWarning, + stacklevel=2, + ) + self._setup_logging(logger) - self.loop = loop or IOLoop.current() + self.loop = self.io_loop = IOLoop.current() if isinstance(security, dict): security = Security(**security) @@ -246,9 +254,7 @@ def __init__( self.plugins: dict[str, NannyPlugin] = {} - super().__init__( - handlers=handlers, io_loop=self.loop, connection_args=self.connection_args - ) + super().__init__(handlers=handlers, connection_args=self.connection_args) self.scheduler = self.rpc(self.scheduler_addr) self.memory_manager = NannyMemoryManager(self, memory_limit=memory_limit) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 048661ac2f2..84b008dddd6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2889,7 +2889,7 @@ def __init__( stacklevel=2, ) - self.loop = IOLoop.current() + self.loop = self.io_loop = IOLoop.current() self._setup_logging(logger) # Attributes @@ -3124,7 +3124,6 @@ def __init__( self, handlers=self.handlers, stream_handlers=merge(worker_handlers, client_handlers), - io_loop=self.loop, connection_limit=connection_limit, deserialize=False, connection_args=self.connection_args, diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index b82f36e27f5..98fc81a41f5 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2980,7 +2980,7 @@ async def test_unrunnable_task_runs(c, s, a, b): assert s.tasks[x.key] in s.unrunnable assert s.get_task_status(keys=[x.key]) == {x.key: "no-worker"} - w = await Worker(s.address, loop=s.loop) + w = await Worker(s.address) while x.status != "finished": await asyncio.sleep(0.01) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index c3588d625b9..dfa58a83579 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -275,8 +275,8 @@ async def test_wait_for_scheduler(): @gen_cluster(nthreads=[], client=True) async def test_environment_variable(c, s): - a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"}) - b = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "456"}) + a = Nanny(s.address, memory_limit=0, env={"FOO": "123"}) + b = Nanny(s.address, memory_limit=0, env={"FOO": "456"}) await asyncio.gather(a, b) results = await c.run(lambda: os.environ["FOO"]) assert results == {a.worker_address: "123", b.worker_address: "456"} @@ -288,7 +288,7 @@ async def test_environment_variable_by_config(c, s, monkeypatch): with dask.config.set({"distributed.nanny.environ": "456"}): with pytest.raises(TypeError, match="configuration must be of type dict"): - Nanny(s.address, loop=s.loop, memory_limit=0) + Nanny(s.address, memory_limit=0) with dask.config.set({"distributed.nanny.environ": {"FOO": "456"}}): @@ -296,10 +296,10 @@ async def test_environment_variable_by_config(c, s, monkeypatch): # kwargs > env var > config with mock.patch.dict(os.environ, {"FOO": "BAR"}, clear=True): - a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"}) - x = Nanny(s.address, loop=s.loop, memory_limit=0) + a = Nanny(s.address, memory_limit=0, env={"FOO": "123"}) + x = Nanny(s.address, memory_limit=0) - b = Nanny(s.address, loop=s.loop, memory_limit=0) + b = Nanny(s.address, memory_limit=0) await asyncio.gather(a, b, x) results = await c.run(lambda: os.environ["FOO"]) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e304a3959ec..2d4c32b53ea 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -952,7 +952,7 @@ async def test_file_descriptors(c, s): num_fds_1 = proc.num_fds() N = 20 - nannies = await asyncio.gather(*(Nanny(s.address, loop=s.loop) for _ in range(N))) + nannies = await asyncio.gather(*(Nanny(s.address) for _ in range(N))) while len(s.workers) < N: await asyncio.sleep(0.1) @@ -2220,7 +2220,7 @@ async def test_worker_name_collision(s, a): with raises_with_cause( RuntimeError, None, ValueError, f"name taken, {a.name!r}" ): - await Worker(s.address, name=a.name, loop=s.loop, host="127.0.0.1") + await Worker(s.address, name=a.name, host="127.0.0.1") s.validate_state() assert set(s.workers) == {a.address} diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 4b68f927596..8221465c59c 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -324,7 +324,7 @@ async def test_new_worker_steals(c, s, a): while len(a.tasks) < 10: await asyncio.sleep(0.01) - b = await Worker(s.address, loop=s.loop, nthreads=1, memory_limit=MEMORY_LIMIT) + b = await Worker(s.address, nthreads=1, memory_limit=MEMORY_LIMIT) result = await total assert result == sum(map(inc, range(100))) @@ -478,7 +478,7 @@ async def test_steal_resource_restrictions(c, s, a): await asyncio.sleep(0.01) assert len(a.tasks) == 101 - b = await Worker(s.address, loop=s.loop, nthreads=1, resources={"A": 4}) + b = await Worker(s.address, nthreads=1, resources={"A": 4}) while not b.tasks or len(a.tasks) == 101: await asyncio.sleep(0.01) @@ -500,15 +500,7 @@ async def test_steal_resource_restrictions_asym_diff(c, s, a): await asyncio.sleep(0.01) assert len(a.tasks) == 101 - b = await Worker( - s.address, - loop=s.loop, - nthreads=1, - resources={ - "A": 4, - "B": 5, - }, - ) + b = await Worker(s.address, nthreads=1, resources={"A": 4, "B": 5}) while not b.tasks or len(a.tasks) == 101: await asyncio.sleep(0.01) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 94ae05b8905..1cb9225488b 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -19,6 +19,7 @@ import psutil import pytest from tlz import first, pluck, sliding_window +from tornado.ioloop import IOLoop import dask from dask import delayed @@ -38,7 +39,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, WINDOWS, to_thread from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -524,8 +525,22 @@ async def test_gather_missing_workers_replicated(c, s, a, b, missing_first): @gen_cluster(nthreads=[]) async def test_io_loop(s): - async with Worker(s.address, loop=s.loop) as w: - assert w.io_loop is s.loop + async with Worker(s.address) as w: + assert w.io_loop is w.loop is s.loop + + +@gen_cluster(nthreads=[]) +async def test_io_loop_alternate_loop(s, loop): + async def main(): + with pytest.warns( + DeprecationWarning, + match=r"The `loop` argument to `Worker` is deprecated, and will be " + r"removed in a future release. The Worker always binds to the current loop", + ): + async with Worker(s.address, loop=loop) as w: + assert w.io_loop is w.loop is IOLoop.current() + + await to_thread(asyncio.run, main()) @gen_cluster(client=True) @@ -1004,7 +1019,7 @@ async def test_worker_fds(s): proc = psutil.Process() before = psutil.Process().num_fds() - async with Worker(s.address, loop=s.loop): + async with Worker(s.address): assert proc.num_fds() > before while proc.num_fds() > before: @@ -1182,7 +1197,7 @@ def func(dask_scheduler): @gen_cluster(nthreads=[], client=True) async def test_scheduler_address_config(c, s): with dask.config.set({"scheduler-address": s.address}): - worker = await Worker(loop=s.loop) + worker = await Worker() assert worker.scheduler.address == s.address await worker.close() @@ -1276,7 +1291,7 @@ def test_startup2(): assert list(result.values()) == [False] * 2 # Start a worker and check that startup is not run - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) result = await c.run(test_import, workers=[worker.address]) assert list(result.values()) == [False] await worker.close() @@ -1290,7 +1305,7 @@ def test_startup2(): assert list(result.values()) == [True] * 2 # Start a worker and check it is ran on it - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) result = await c.run(test_import, workers=[worker.address]) assert list(result.values()) == [True] await worker.close() @@ -1304,7 +1319,7 @@ def test_startup2(): assert list(result.values()) == [True] * 2 # Start a worker and check it is ran on it - worker = await Worker(s.address, loop=s.loop) + worker = await Worker(s.address) result = await c.run(test_import, workers=[worker.address]) assert list(result.values()) == [True] result = await c.run(test_startup2, workers=[worker.address]) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index edd74bef17e..7c4eb6d0874 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -891,8 +891,9 @@ async def start_cluster( scheduler_kwargs: dict[str, Any] = {}, worker_kwargs: dict[str, Any] = {}, ) -> tuple[Scheduler, list[ServerNode]]: + if loop is not None: + warnings.warn("the loop kwarg to start_cluster is deprecated") s = await Scheduler( - loop=loop, validate=True, security=security, port=0, @@ -906,7 +907,6 @@ async def start_cluster( nthreads=ncore[1], name=i, security=security, - loop=loop, validate=True, host=ncore[0], **( diff --git a/distributed/worker.py b/distributed/worker.py index c146cb56fff..bafb37bc1ac 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -351,7 +351,6 @@ class Worker(ServerNode): data: MutableMapping, type, None The object to use for storage, builds a disk-backed LRU dict by default nthreads: int, optional - loop: tornado.ioloop.IOLoop local_directory: str, optional Directory where we place local resources name: str, optional @@ -514,7 +513,7 @@ def __init__( *, scheduler_file: str | None = None, nthreads: int | None = None, - loop: IOLoop | None = None, + loop: IOLoop | None = None, # Deprecated local_dir: None = None, # Deprecated, use local_directory instead local_directory: str | None = None, services: dict | None = None, @@ -579,6 +578,13 @@ def __init__( DeprecationWarning, stacklevel=2, ) + if loop is not None: + warnings.warn( + "The `loop` argument to `Worker` is deprecated, and will be removed in a future release. " + "The Worker always binds to the current loop", + DeprecationWarning, + stacklevel=2, + ) self.tasks = {} self.waiting_for_data_count = 0 self.has_what = defaultdict(set) @@ -729,7 +735,7 @@ def __init__( self.connection_args = self.security.get_connection_args("worker") self.actors = {} - self.loop = loop or IOLoop.current() + self.loop = self.io_loop = IOLoop.current() # Common executors always available self.executors = { @@ -819,7 +825,6 @@ def __init__( super().__init__( handlers=handlers, stream_handlers=stream_handlers, - io_loop=self.loop, connection_args=self.connection_args, **kwargs, ) @@ -878,7 +883,7 @@ def __init__( if lifetime is None: lifetime = dask.config.get("distributed.worker.lifetime.duration") - self.lifetime = parse_timedelta(lifetime) + lifetime = parse_timedelta(lifetime) if lifetime_stagger is None: lifetime_stagger = dask.config.get("distributed.worker.lifetime.stagger") @@ -888,9 +893,10 @@ def __init__( lifetime_restart = dask.config.get("distributed.worker.lifetime.restart") self.lifetime_restart = lifetime_restart - if self.lifetime: - self.lifetime += (random.random() * 2 - 1) * lifetime_stagger - self.io_loop.call_later(self.lifetime, self.close_gracefully) + if lifetime: + lifetime += (random.random() * 2 - 1) * lifetime_stagger + self.io_loop.call_later(lifetime, self.close_gracefully) + self.lifetime = lifetime self._async_instructions = set() From d0b35e0655f98c02daf698412890cc99d7f9d303 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 1 Jun 2022 09:02:22 +0100 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Gabe Joseph --- distributed/core.py | 2 +- distributed/nanny.py | 4 ++-- distributed/tests/test_worker.py | 2 +- distributed/worker.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index f0bc6b219e2..59fe1ab909e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -164,7 +164,7 @@ def __init__( ): if io_loop is not None: warnings.warn( - "The io_loop kwarg to Server is deprecated", + "The io_loop kwarg to Server is ignored and will be deprecated", DeprecationWarning, stacklevel=2, ) diff --git a/distributed/nanny.py b/distributed/nanny.py index 5f77f2cdcf8..dfb00bb308c 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -136,8 +136,8 @@ def __init__( ): if loop is not None: warnings.warn( - "the `loop` kwarg to `Nanny` is deprecated, and will be removed in a future release. " - "the Nanny always binds to the current loop.", + "the `loop` kwarg to `Nanny` is ignored, and will be removed in a future release. " + "The Nanny always binds to the current loop.", DeprecationWarning, stacklevel=2, ) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1cb9225488b..e3f2cfda296 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -534,7 +534,7 @@ async def test_io_loop_alternate_loop(s, loop): async def main(): with pytest.warns( DeprecationWarning, - match=r"The `loop` argument to `Worker` is deprecated, and will be " + match=r"The `loop` argument to `Worker` is ignored, and will be " r"removed in a future release. The Worker always binds to the current loop", ): async with Worker(s.address, loop=loop) as w: diff --git a/distributed/worker.py b/distributed/worker.py index bafb37bc1ac..d43fd4adcc8 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -580,7 +580,7 @@ def __init__( ) if loop is not None: warnings.warn( - "The `loop` argument to `Worker` is deprecated, and will be removed in a future release. " + "The `loop` argument to `Worker` is ignored, and will be removed in a future release. " "The Worker always binds to the current loop", DeprecationWarning, stacklevel=2, From 476f40b696e240c46e18a8684a0dd1f3501387ab Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 1 Jun 2022 10:43:08 +0100 Subject: [PATCH 3/3] remove loop kwarg from utils_test.start_cluster --- distributed/utils_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 7c4eb6d0874..f95158e2042 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -885,14 +885,11 @@ def test_func(*args, **kwargs): async def start_cluster( nthreads: list[tuple[str, int] | tuple[str, int, dict]], scheduler_addr: str, - loop: IOLoop | None = None, security: Security | dict[str, Any] | None = None, Worker: type[ServerNode] = Worker, scheduler_kwargs: dict[str, Any] = {}, worker_kwargs: dict[str, Any] = {}, ) -> tuple[Scheduler, list[ServerNode]]: - if loop is not None: - warnings.warn("the loop kwarg to start_cluster is deprecated") s = await Scheduler( validate=True, security=security,