diff --git a/distributed/actor.py b/distributed/actor.py index 0f6ae566c36..064c1432725 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -122,9 +122,9 @@ def __reduce__(self): @property def _io_loop(self): if self._worker: - return self._worker.io_loop + return self._worker.loop else: - return self._client.io_loop + return self._client.loop @property def _scheduler_rpc(self): diff --git a/distributed/client.py b/distributed/client.py index 115aa1a6be3..2c3112ac5cf 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -48,7 +48,7 @@ except ImportError: single_key = first from tornado import gen -from tornado.ioloop import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback import distributed.utils from distributed import cluster_dump, preloading @@ -760,6 +760,7 @@ class Client(SyncMethodMixin): _default_event_handlers = {"print": _handle_print, "warn": _handle_warn} preloads: list[preloading.Preload] + __loop: IOLoop | None = None def __init__( self, @@ -872,7 +873,6 @@ def __init__( self._asynchronous = asynchronous self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.io_loop = self.loop = self._loop_runner.loop self._connecting_to_scheduler = False self._gather_keys = None @@ -944,6 +944,38 @@ def __init__( ReplayTaskClient(self) + @property + def io_loop(self) -> IOLoop | None: + warnings.warn( + "The io_loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + return self.loop + + @io_loop.setter + def io_loop(self, value: IOLoop) -> None: + warnings.warn( + "The io_loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.loop = value + + @property + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.__loop = value + @contextmanager def as_current(self): """Thread-local, Task-local context manager that causes the Client.current @@ -1356,7 +1388,7 @@ def __exit__(self, exc_type, exc_value, traceback): def __del__(self): # If the loop never got assigned, we failed early in the constructor, # nothing to do - if hasattr(self, "loop"): + if self.__loop is not None: self.close() def _inc_ref(self, key): diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 64f36958d02..38e19fabb7e 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -9,7 +9,7 @@ from inspect import isawaitable from typing import Any -from tornado.ioloop import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback import dask.config from dask.utils import _deprecated, format_bytes, parse_timedelta, typename @@ -55,6 +55,7 @@ class Cluster(SyncMethodMixin): """ _supports_scaling = True + __loop: IOLoop | None = None def __init__( self, @@ -65,7 +66,6 @@ def __init__( scheduler_sync_interval=1, ): self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.loop = self._loop_runner.loop self.scheduler_info = {"workers": {}} self.periodic_callbacks = {} @@ -89,6 +89,26 @@ def __init__( } self.status = Status.created + @property + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + if value is None: + raise ValueError("expected an IOLoop, got None") + self.__loop = value + @property def name(self): return self._cluster_info["name"] diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 3388739295f..a5f482b15b6 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any, ClassVar from tornado import gen +from tornado.ioloop import IOLoop import dask from dask.utils import parse_bytes, parse_timedelta @@ -230,6 +231,9 @@ def __init__( shutdown_on_close=True, scheduler_sync_interval=1, ): + if loop is None and asynchronous: + loop = IOLoop.current() + self._created = weakref.WeakSet() self.scheduler_spec = copy.copy(scheduler) @@ -259,7 +263,14 @@ def __init__( scheduler_sync_interval=scheduler_sync_interval, ) - if not self.asynchronous: + try: + called_from_running_loop = ( + getattr(loop, "asyncio_loop", None) is asyncio.get_running_loop() + ) + except RuntimeError: + called_from_running_loop = asynchronous + + if not called_from_running_loop: self._loop_runner.start() self.sync(self._start) try: @@ -659,7 +670,7 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny @atexit.register def close_clusters(): for cluster in list(SpecCluster._instances): - if cluster.shutdown_on_close: + if getattr(cluster, "shutdown_on_close", False): with suppress(gen.TimeoutError, TimeoutError): - if cluster.status != Status.closed: + if getattr(cluster, "status", Status.closed) != Status.closed: cluster.close(timeout=10) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 2712b12be78..051f551e110 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -289,6 +289,7 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: diff --git a/distributed/deploy/tests/test_cluster.py b/distributed/deploy/tests/test_cluster.py index da38e14d393..e89180ab086 100644 --- a/distributed/deploy/tests/test_cluster.py +++ b/distributed/deploy/tests/test_cluster.py @@ -1,6 +1,7 @@ from __future__ import annotations import pytest +from tornado.ioloop import IOLoop from distributed.deploy.cluster import Cluster from distributed.utils_test import gen_test @@ -35,3 +36,19 @@ async def test_logs_deprecated(): async with Cluster(asynchronous=True) as cluster: with pytest.warns(FutureWarning, match="get_logs"): cluster.logs() + + +@gen_test() +async def test_deprecated_loop_properties(): + class ExampleCluster(Cluster): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + async with ExampleCluster(asynchronous=True, loop=IOLoop.current()): + pass + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated") + ] diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index a7a1a2f677c..9121abb45ff 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,6 +82,7 @@ def test_spec_sync(loop): assert result == 11 +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 51b8b248d40..72492773b45 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -3,6 +3,7 @@ import html import logging import sys +import warnings import weakref from contextlib import suppress from timeit import default_timer @@ -111,6 +112,8 @@ def __del__(self): class TextProgressBar(ProgressBar): + __loop: IOLoop | None = None + def __init__( self, keys, @@ -122,14 +125,31 @@ def __init__( start=True, **kwargs, ): + self._loop_runner = loop_runner = LoopRunner(loop=loop) super().__init__(keys, scheduler, interval, complete) self.width = width - self.loop = loop or IOLoop() if start: - loop_runner = LoopRunner(self.loop) loop_runner.run_sync(self.listen) + @property + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.__loop = value + def _draw_bar(self, remaining, all, **kwargs): frac = (1 - remaining / all) if all else 1.0 bar = "#" * int(self.width * frac) diff --git a/distributed/diagnostics/tests/test_progressbar.py b/distributed/diagnostics/tests/test_progressbar.py index 39f7a15bbf9..72c41325c5f 100644 --- a/distributed/diagnostics/tests/test_progressbar.py +++ b/distributed/diagnostics/tests/test_progressbar.py @@ -2,6 +2,9 @@ from time import sleep +import pytest +from tornado.ioloop import IOLoop + from distributed.diagnostics.progressbar import TextProgressBar, progress from distributed.metrics import time from distributed.utils_test import div, gen_cluster, inc @@ -72,3 +75,18 @@ def test_progress_function_w_kwargs(client, capsys): progress(f, interval="20ms", loop=client.loop) check_bar_completed(capsys) + + +@gen_cluster(client=True, nthreads=[]) +async def test_deprecated_loop_properties(c, s): + class ExampleTextProgressBar(TextProgressBar): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + ExampleTextProgressBar(client=c, keys=[], start=False, loop=IOLoop.current()) + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated") + ] diff --git a/distributed/semaphore.py b/distributed/semaphore.py index a88bc0af778..83bbf29c862 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -377,7 +377,7 @@ def __init__( except ValueError: client = get_client() self.scheduler = scheduler_rpc or client.scheduler - self.loop = loop or client.io_loop + self.loop = loop or client.loop self.name = name or "semaphore-" + uuid.uuid4().hex self.max_leases = max_leases diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 3625caa163c..aebdecc496b 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -31,6 +31,7 @@ import pytest import yaml from tlz import concat, first, identity, isdistinct, merge, pluck, valmap +from tornado.ioloop import IOLoop import dask import dask.bag as db @@ -2873,6 +2874,7 @@ async def test_startup_close_startup(s, a, b): await c.close() +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5532,13 +5534,18 @@ async def test_future_auto_inform(c, s, a, b): await client.close() +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): async def close(): async with client: pass with pristine_loop() as loop: - client = Client(asynchronous=True, loop=loop) + with pytest.warns( + DeprecationWarning, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", + ): + client = Client(asynchronous=True, loop=loop) assert client.asynchronous assert isinstance(client.close(), NoOpAwaitable) loop.run_sync(close) # TODO: client.close() does not unset global client @@ -6837,6 +6844,7 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 @@ -7550,3 +7558,21 @@ def test_quiet_close_process(processes, tmp_path): assert not out assert not err + + +@gen_cluster(client=False, nthreads=[]) +async def test_deprecated_loop_properties(s): + class ExampleClient(Client): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + async with ExampleClient(s.address, asynchronous=True, loop=IOLoop.current()): + pass + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated"), + (DeprecationWarning, "The io_loop property is deprecated"), + (DeprecationWarning, "setting the loop property is deprecated"), + ] diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 723031145cc..35bb3a7e861 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,6 +2,8 @@ import contextlib +import pytest + from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -27,12 +29,14 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 70f33b03d00..853ba578f11 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -387,25 +387,48 @@ def assert_not_running(loop): q.get(timeout=0.02) +_loop_not_running_property_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Accessing the loop property while the loop is not running is deprecated", +) +_explicit_loop_is_not_running_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", +) +_implicit_loop_is_not_running_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", +) + + +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop loop = IOLoop() loop.make_current() runner = LoopRunner() - assert runner.loop not in (loop, loop_in_thread) + with _loop_not_running_property_warning(): + assert runner.loop not in (loop, loop_in_thread) assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) # Explicit loop loop = IOLoop() - runner = LoopRunner(loop=loop) - assert runner.loop is loop + with _explicit_loop_is_not_running_warning(): + runner = LoopRunner(loop=loop) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() assert_not_running(loop) runner.start() @@ -429,38 +452,51 @@ def test_loop_runner(loop_in_thread): # Implicit loop, asynchronous=True loop = IOLoop() loop.make_current() - runner = LoopRunner(asynchronous=True) - assert runner.loop is loop + with _implicit_loop_is_not_running_warning(): + runner = LoopRunner(asynchronous=True) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) # Explicit loop, asynchronous=True loop = IOLoop() - runner = LoopRunner(loop=loop, asynchronous=True) - assert runner.loop is loop + with _explicit_loop_is_not_running_warning(): + runner = LoopRunner(loop=loop, asynchronous=True) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate # ABCCBA loop = IOLoop() - a = LoopRunner(loop=loop) - b = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + a = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + b = LoopRunner(loop=loop) assert_not_running(loop) a.start() assert_running(loop) @@ -478,8 +514,10 @@ def test_two_loop_runners(loop_in_thread): # ABCABC loop = IOLoop() - a = LoopRunner(loop=loop) - b = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + a = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + b = LoopRunner(loop=loop) assert_not_running(loop) a.start() assert_running(loop) diff --git a/distributed/utils.py b/distributed/utils.py index 35db2fdab1a..654b6f96f5a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -433,12 +433,26 @@ class LoopRunner: def __init__(self, loop=None, asynchronous=False): if loop is None: if asynchronous: + try: + asyncio.get_running_loop() + except RuntimeError: + warnings.warn( + "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", + DeprecationWarning, + stacklevel=2, + ) self._loop = IOLoop.current() else: # We're expecting the loop to run in another thread, # avoid re-using this thread's assigned loop self._loop = IOLoop() else: + if not loop.asyncio_loop.is_running(): + warnings.warn( + "Constructing LoopRunner(loop=loop) without a running loop is deprecated", + DeprecationWarning, + stacklevel=2, + ) self._loop = loop self._asynchronous = asynchronous self._loop_thread = None @@ -569,6 +583,13 @@ def run_sync(self, func, *args, **kwargs): @property def loop(self): + loop = self._loop + if not loop.asyncio_loop.is_running(): + warnings.warn( + "Accessing the loop property while the loop is not running is deprecated", + DeprecationWarning, + stacklevel=2, + ) return self._loop diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fadc6002376..8728bcd8447 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -71,7 +71,6 @@ iscoroutinefunction, log_errors, reset_logger_locks, - sync, ) from distributed.worker import WORKER_ANY_RUNNING, Worker from distributed.worker_state_machine import ( @@ -144,36 +143,8 @@ async def cleanup_global_workers(): @pytest.fixture -def loop(cleanup): - with check_instances(): - with pristine_loop() as loop: - # Monkey-patch IOLoop.start to wait for loop stop - orig_start = loop.start - is_stopped = threading.Event() - is_stopped.set() - - def start(): - is_stopped.clear() - try: - orig_start() - finally: - is_stopped.set() - - loop.start = start - - yield loop - - # Stop the loop in case it's still running - try: - sync(loop, cleanup_global_workers, callback_timeout=0.500) - loop.add_callback(loop.stop) - except RuntimeError as e: - if not re.match("IOLoop is clos(ed|ing)", str(e)): - raise - except asyncio.TimeoutError: - pass - else: - is_stopped.wait() +def loop(loop_in_thread): + return loop_in_thread @pytest.fixture diff --git a/setup.cfg b/setup.cfg index 5e36d379991..a56a679cb8e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,7 +55,6 @@ addopts = filterwarnings = error ignore:Please use `dok_matrix` from the `scipy\.sparse` namespace, the `scipy\.sparse\.dok` namespace is deprecated.:DeprecationWarning - ignore:There is no current event loop:DeprecationWarning ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning ignore:unclosed