From bd6c7ec3d01b7029da0fd11ee18acbcf220bb0d7 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 23 May 2022 12:43:15 +0100 Subject: [PATCH 01/12] deprecate incorrect usage of LoopRunner --- distributed/client.py | 12 +++++++++++- distributed/deploy/cluster.py | 5 ++++- distributed/deploy/spec.py | 13 ++++++++++++- distributed/utils.py | 21 +++++++++++++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index e63a3173502..5d7736dbb15 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -872,7 +872,6 @@ def __init__( self._asynchronous = asynchronous self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.io_loop = self.loop = self._loop_runner.loop self._connecting_to_scheduler = False self._gather_keys = None @@ -944,6 +943,17 @@ def __init__( ReplayTaskClient(self) + @property + def io_loop(self): + warnings.warn( + "The io_loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + return self._loop_runner.loop + + @property + def loop(self): + return self._loop_runner.loop + @contextmanager def as_current(self): """Thread-local, Task-local context manager that causes the Client.current diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index eded4c20026..37c675ae8ba 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -66,7 +66,6 @@ def __init__( scheduler_sync_interval=1, ): self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.loop = self._loop_runner.loop self.scheduler_info = {"workers": {}} self.periodic_callbacks = {} @@ -92,6 +91,10 @@ def __init__( } self.status = Status.created + @property + def loop(self): + return self._loop_runner.loop + @property def name(self): return self._cluster_info["name"] diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 3388739295f..4934f38d81a 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any, ClassVar from tornado import gen +from tornado.ioloop import IOLoop import dask from dask.utils import parse_bytes, parse_timedelta @@ -230,6 +231,9 @@ def __init__( shutdown_on_close=True, scheduler_sync_interval=1, ): + if loop is None and asynchronous: + loop = IOLoop.current() + self._created = weakref.WeakSet() self.scheduler_spec = copy.copy(scheduler) @@ -259,7 +263,14 @@ def __init__( scheduler_sync_interval=scheduler_sync_interval, ) - if not self.asynchronous: + try: + called_from_running_loop = ( + getattr(loop, "asyncio_loop", None) is asyncio.get_running_loop() + ) + except RuntimeError: + called_from_running_loop = asynchronous + + if not called_from_running_loop: self._loop_runner.start() self.sync(self._start) try: diff --git a/distributed/utils.py b/distributed/utils.py index 35db2fdab1a..654b6f96f5a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -433,12 +433,26 @@ class LoopRunner: def __init__(self, loop=None, asynchronous=False): if loop is None: if asynchronous: + try: + asyncio.get_running_loop() + except RuntimeError: + warnings.warn( + "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", + DeprecationWarning, + stacklevel=2, + ) self._loop = IOLoop.current() else: # We're expecting the loop to run in another thread, # avoid re-using this thread's assigned loop self._loop = IOLoop() else: + if not loop.asyncio_loop.is_running(): + warnings.warn( + "Constructing LoopRunner(loop=loop) without a running loop is deprecated", + DeprecationWarning, + stacklevel=2, + ) self._loop = loop self._asynchronous = asynchronous self._loop_thread = None @@ -569,6 +583,13 @@ def run_sync(self, func, *args, **kwargs): @property def loop(self): + loop = self._loop + if not loop.asyncio_loop.is_running(): + warnings.warn( + "Accessing the loop property while the loop is not running is deprecated", + DeprecationWarning, + stacklevel=2, + ) return self._loop From dd0debd2df679e6ea4b7e9da1f13c6678d7aa282 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 12:54:10 +0100 Subject: [PATCH 02/12] avoid accessing self.loop in Client --- distributed/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 5d7736dbb15..6af19b2b33e 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1366,8 +1366,10 @@ def __exit__(self, exc_type, exc_value, traceback): def __del__(self): # If the loop never got assigned, we failed early in the constructor, # nothing to do - if hasattr(self, "loop"): - self.close() + if getattr(self, "status", "newly-created") in ["closed", "newly-created"]: + return + + self.close() def _inc_ref(self, key): with self._refcount_lock: From 865e62a491d2548a896613ec8aaceae8d50a9c3c Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 12:54:40 +0100 Subject: [PATCH 03/12] run loop() fixtures' loop in a thread --- distributed/utils_test.py | 33 ++------------------------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 966182a1559..7aa6cc0680a 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -71,7 +71,6 @@ iscoroutinefunction, log_errors, reset_logger_locks, - sync, ) from distributed.worker import WORKER_ANY_RUNNING, Worker from distributed.worker_state_machine import ( @@ -144,36 +143,8 @@ async def cleanup_global_workers(): @pytest.fixture -def loop(cleanup): - with check_instances(): - with pristine_loop() as loop: - # Monkey-patch IOLoop.start to wait for loop stop - orig_start = loop.start - is_stopped = threading.Event() - is_stopped.set() - - def start(): - is_stopped.clear() - try: - orig_start() - finally: - is_stopped.set() - - loop.start = start - - yield loop - - # Stop the loop in case it's still running - try: - sync(loop, cleanup_global_workers, callback_timeout=0.500) - loop.add_callback(loop.stop) - except RuntimeError as e: - if not re.match("IOLoop is clos(ed|ing)", str(e)): - raise - except asyncio.TimeoutError: - pass - else: - is_stopped.wait() +def loop(loop_in_thread): + return loop_in_thread @pytest.fixture From 5d615d66ffc7a4fa37d1ec6bf34e94aab6429052 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 13:31:54 +0100 Subject: [PATCH 04/12] cache the loop property on first access --- distributed/client.py | 38 ++++++++++++++++++++++++++--------- distributed/deploy/cluster.py | 23 ++++++++++++++++++--- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 6af19b2b33e..a9c65b68b66 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -48,7 +48,7 @@ except ImportError: single_key = first from tornado import gen -from tornado.ioloop import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback import distributed.utils from distributed import cluster_dump, preloading @@ -760,6 +760,7 @@ class Client(SyncMethodMixin): _default_event_handlers = {"print": _handle_print, "warn": _handle_warn} preloads: list[preloading.Preload] + __loop: IOLoop | None = None def __init__( self, @@ -944,15 +945,36 @@ def __init__( ReplayTaskClient(self) @property - def io_loop(self): + def io_loop(self) -> IOLoop | None: warnings.warn( "The io_loop property is deprecated", DeprecationWarning, stacklevel=2 ) - return self._loop_runner.loop + return self.loop + + @io_loop.setter + def io_loop(self, value: IOLoop) -> None: + warnings.warn( + "The io_loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.loop = value @property - def loop(self): - return self._loop_runner.loop + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.__loop = value @contextmanager def as_current(self): @@ -1366,10 +1388,8 @@ def __exit__(self, exc_type, exc_value, traceback): def __del__(self): # If the loop never got assigned, we failed early in the constructor, # nothing to do - if getattr(self, "status", "newly-created") in ["closed", "newly-created"]: - return - - self.close() + if self.__loop is not None: + self.close() def _inc_ref(self, key): with self._refcount_lock: diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 37c675ae8ba..ce414800f47 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -9,7 +9,7 @@ from inspect import isawaitable from typing import Any -from tornado.ioloop import PeriodicCallback +from tornado.ioloop import IOLoop, PeriodicCallback import dask.config from dask.utils import _deprecated, format_bytes, parse_timedelta, typename @@ -56,6 +56,7 @@ class Cluster(SyncMethodMixin): _supports_scaling = True _cluster_info: dict = {} + __loop: IOLoop | None = None def __init__( self, @@ -92,8 +93,24 @@ def __init__( self.status = Status.created @property - def loop(self): - return self._loop_runner.loop + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + if value is None: + raise ValueError("expected an IOLoop, got None") + self.__loop = value @property def name(self): From 208f67a97e87cc67d814183b86a2330eaaa840fb Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 15:11:27 +0100 Subject: [PATCH 05/12] avoid deprecated io_loop property --- distributed/actor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index 0f6ae566c36..064c1432725 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -122,9 +122,9 @@ def __reduce__(self): @property def _io_loop(self): if self._worker: - return self._worker.io_loop + return self._worker.loop else: - return self._client.io_loop + return self._client.loop @property def _scheduler_rpc(self): From ab1cd033af0239dbfedebffaa0010b7a4bfaff04 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 15:12:17 +0100 Subject: [PATCH 06/12] avoid deprecated use of LoopRunner in TextProgressBar --- distributed/diagnostics/progressbar.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 51b8b248d40..72492773b45 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -3,6 +3,7 @@ import html import logging import sys +import warnings import weakref from contextlib import suppress from timeit import default_timer @@ -111,6 +112,8 @@ def __del__(self): class TextProgressBar(ProgressBar): + __loop: IOLoop | None = None + def __init__( self, keys, @@ -122,14 +125,31 @@ def __init__( start=True, **kwargs, ): + self._loop_runner = loop_runner = LoopRunner(loop=loop) super().__init__(keys, scheduler, interval, complete) self.width = width - self.loop = loop or IOLoop() if start: - loop_runner = LoopRunner(self.loop) loop_runner.run_sync(self.listen) + @property + def loop(self) -> IOLoop | None: + loop = self.__loop + if loop is None: + # If the loop is not running when this is called, the LoopRunner.loop + # property will raise a DeprecationWarning + # However subsequent calls might occur - eg atexit, where a stopped + # loop is still acceptable - so we cache access to the loop. + self.__loop = loop = self._loop_runner.loop + return loop + + @loop.setter + def loop(self, value: IOLoop) -> None: + warnings.warn( + "setting the loop property is deprecated", DeprecationWarning, stacklevel=2 + ) + self.__loop = value + def _draw_bar(self, remaining, all, **kwargs): frac = (1 - remaining / all) if all else 1.0 bar = "#" * int(self.width * frac) From 5072a209bf55671bdc4d6c7baa0765f1564a555d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 15:34:17 +0100 Subject: [PATCH 07/12] test_client_async_before_loop_starts now intentionally warns a DeprecationWarning --- distributed/tests/test_client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c8c849eea79..81d74ee3105 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5543,7 +5543,11 @@ async def close(): pass with pristine_loop() as loop: - client = Client(asynchronous=True, loop=loop) + with pytest.warns( + DeprecationWarning, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", + ): + client = Client(asynchronous=True, loop=loop) assert client.asynchronous assert isinstance(client.close(), NoOpAwaitable) loop.run_sync(close) # TODO: client.close() does not unset global client From f8bb44041da6568a194f061f86502747dc122c80 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 May 2022 18:20:48 +0100 Subject: [PATCH 08/12] ignore deprecation warnings in LoopRunner tests --- distributed/semaphore.py | 2 +- distributed/tests/test_utils.py | 74 ++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/distributed/semaphore.py b/distributed/semaphore.py index a88bc0af778..83bbf29c862 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -377,7 +377,7 @@ def __init__( except ValueError: client = get_client() self.scheduler = scheduler_rpc or client.scheduler - self.loop = loop or client.io_loop + self.loop = loop or client.loop self.name = name or "semaphore-" + uuid.uuid4().hex self.max_leases = max_leases diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 70f33b03d00..53f0802aa61 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -387,25 +387,47 @@ def assert_not_running(loop): q.get(timeout=0.02) +_loop_not_running_property_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Accessing the loop property while the loop is not running is deprecated", +) +_explicit_loop_is_not_running_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", +) +_implicit_loop_is_not_running_warning = functools.partial( + pytest.warns, + DeprecationWarning, + match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", +) + + def test_loop_runner(loop_in_thread): # Implicit loop loop = IOLoop() loop.make_current() runner = LoopRunner() - assert runner.loop not in (loop, loop_in_thread) + with _loop_not_running_property_warning(): + assert runner.loop not in (loop, loop_in_thread) assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) # Explicit loop loop = IOLoop() - runner = LoopRunner(loop=loop) - assert runner.loop is loop + with _explicit_loop_is_not_running_warning(): + runner = LoopRunner(loop=loop) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() assert_not_running(loop) runner.start() @@ -429,29 +451,39 @@ def test_loop_runner(loop_in_thread): # Implicit loop, asynchronous=True loop = IOLoop() loop.make_current() - runner = LoopRunner(asynchronous=True) - assert runner.loop is loop + with _implicit_loop_is_not_running_warning(): + runner = LoopRunner(asynchronous=True) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) # Explicit loop, asynchronous=True loop = IOLoop() - runner = LoopRunner(loop=loop, asynchronous=True) - assert runner.loop is loop + with _explicit_loop_is_not_running_warning(): + runner = LoopRunner(loop=loop, asynchronous=True) + with _loop_not_running_property_warning(): + assert runner.loop is loop assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.start() assert runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) runner.stop() assert not runner.is_started() - assert_not_running(runner.loop) + with _loop_not_running_property_warning(): + assert_not_running(runner.loop) def test_two_loop_runners(loop_in_thread): @@ -459,8 +491,10 @@ def test_two_loop_runners(loop_in_thread): # ABCCBA loop = IOLoop() - a = LoopRunner(loop=loop) - b = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + a = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + b = LoopRunner(loop=loop) assert_not_running(loop) a.start() assert_running(loop) @@ -478,8 +512,10 @@ def test_two_loop_runners(loop_in_thread): # ABCABC loop = IOLoop() - a = LoopRunner(loop=loop) - b = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + a = LoopRunner(loop=loop) + with _explicit_loop_is_not_running_warning(): + b = LoopRunner(loop=loop) assert_not_running(loop) a.start() assert_running(loop) From a056847cbfc828177eb2d69acd219d1c0dd71971 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 31 May 2022 13:46:56 +0100 Subject: [PATCH 09/12] unignore deprecated asyncio.get_event_loop() --- setup.cfg | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index bc76731dde2..40acc606e2e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,7 +55,6 @@ addopts = filterwarnings = error ignore:Please use `dok_matrix` from the `scipy\.sparse` namespace, the `scipy\.sparse\.dok` namespace is deprecated.:DeprecationWarning - ignore:There is no current event loop:DeprecationWarning ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning ignore:unclosed Date: Mon, 6 Jun 2022 13:24:53 +0100 Subject: [PATCH 10/12] cluster attributes may not have been assigned by the time atexit is called --- distributed/deploy/spec.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 4934f38d81a..a5f482b15b6 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -670,7 +670,7 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny @atexit.register def close_clusters(): for cluster in list(SpecCluster._instances): - if cluster.shutdown_on_close: + if getattr(cluster, "shutdown_on_close", False): with suppress(gen.TimeoutError, TimeoutError): - if cluster.status != Status.closed: + if getattr(cluster, "status", Status.closed) != Status.closed: cluster.close(timeout=10) From bc98b4ea6f80af790abb8e12d1f7de53504dfa67 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 6 Jul 2022 09:41:36 +0100 Subject: [PATCH 11/12] mark deprecated those tests that use loop=None --- distributed/deploy/tests/test_adaptive.py | 1 + distributed/deploy/tests/test_spec_cluster.py | 1 + distributed/tests/test_client.py | 3 +++ distributed/tests/test_client_loop.py | 4 ++++ distributed/tests/test_utils.py | 2 ++ 5 files changed, 11 insertions(+) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 2712b12be78..051f551e110 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -289,6 +289,7 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index a7a1a2f677c..9121abb45ff 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,6 +82,7 @@ def test_spec_sync(loop): assert result == 11 +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 81d74ee3105..8aac18ffd52 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2874,6 +2874,7 @@ async def test_startup_close_startup(s, a, b): await c.close() +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5537,6 +5538,7 @@ async def test_future_auto_inform(c, s, a, b): await client.close() +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): async def close(): async with client: @@ -6846,6 +6848,7 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 723031145cc..35bb3a7e861 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,6 +2,8 @@ import contextlib +import pytest + from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -27,12 +29,14 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 53f0802aa61..853ba578f11 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -404,6 +404,7 @@ def assert_not_running(loop): ) +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop loop = IOLoop() @@ -486,6 +487,7 @@ def test_loop_runner(loop_in_thread): assert_not_running(runner.loop) +@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate From 8a99fa3a7387c1f8362fb2b78f53ed1bb2006c88 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 7 Jul 2022 15:32:50 +0100 Subject: [PATCH 12/12] test deprecated loop and io_loop properties --- distributed/deploy/tests/test_cluster.py | 17 +++++++++++++++++ .../diagnostics/tests/test_progressbar.py | 18 ++++++++++++++++++ distributed/tests/test_client.py | 19 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/distributed/deploy/tests/test_cluster.py b/distributed/deploy/tests/test_cluster.py index f1644632d95..546fe539edc 100644 --- a/distributed/deploy/tests/test_cluster.py +++ b/distributed/deploy/tests/test_cluster.py @@ -1,6 +1,7 @@ from __future__ import annotations import pytest +from tornado.ioloop import IOLoop from distributed.deploy.cluster import Cluster from distributed.utils_test import gen_test @@ -48,3 +49,19 @@ def __init__(self): assert "foo" in cluster._cluster_info # exists before start() called with cluster: # start and stop the cluster to avoid a resource warning pass + + +@gen_test() +async def test_deprecated_loop_properties(): + class ExampleCluster(Cluster): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + async with ExampleCluster(asynchronous=True, loop=IOLoop.current()): + pass + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated") + ] diff --git a/distributed/diagnostics/tests/test_progressbar.py b/distributed/diagnostics/tests/test_progressbar.py index 39f7a15bbf9..72c41325c5f 100644 --- a/distributed/diagnostics/tests/test_progressbar.py +++ b/distributed/diagnostics/tests/test_progressbar.py @@ -2,6 +2,9 @@ from time import sleep +import pytest +from tornado.ioloop import IOLoop + from distributed.diagnostics.progressbar import TextProgressBar, progress from distributed.metrics import time from distributed.utils_test import div, gen_cluster, inc @@ -72,3 +75,18 @@ def test_progress_function_w_kwargs(client, capsys): progress(f, interval="20ms", loop=client.loop) check_bar_completed(capsys) + + +@gen_cluster(client=True, nthreads=[]) +async def test_deprecated_loop_properties(c, s): + class ExampleTextProgressBar(TextProgressBar): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + ExampleTextProgressBar(client=c, keys=[], start=False, loop=IOLoop.current()) + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated") + ] diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 8aac18ffd52..cd1ebc1849c 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -31,6 +31,7 @@ import pytest import yaml from tlz import concat, first, identity, isdistinct, merge, pluck, valmap +from tornado.ioloop import IOLoop import dask import dask.bag as db @@ -7562,3 +7563,21 @@ def test_quiet_close_process(processes, tmp_path): assert not out assert not err + + +@gen_cluster(client=False, nthreads=[]) +async def test_deprecated_loop_properties(s): + class ExampleClient(Client): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = self.io_loop = IOLoop.current() + + with pytest.warns(DeprecationWarning) as warninfo: + async with ExampleClient(s.address, asynchronous=True, loop=IOLoop.current()): + pass + + assert [(w.category, *w.message.args) for w in warninfo] == [ + (DeprecationWarning, "setting the loop property is deprecated"), + (DeprecationWarning, "The io_loop property is deprecated"), + (DeprecationWarning, "setting the loop property is deprecated"), + ]