Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def __reduce__(self):
@property
def _io_loop(self):
if self._worker:
return self._worker.io_loop
return self._worker.loop
else:
return self._client.io_loop
return self._client.loop

@property
def _scheduler_rpc(self):
Expand Down
38 changes: 35 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
except ImportError:
single_key = first
from tornado import gen
from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import distributed.utils
from distributed import cluster_dump, preloading
Expand Down Expand Up @@ -760,6 +760,7 @@ class Client(SyncMethodMixin):
_default_event_handlers = {"print": _handle_print, "warn": _handle_warn}

preloads: list[preloading.Preload]
__loop: IOLoop | None = None

def __init__(
self,
Expand Down Expand Up @@ -872,7 +873,6 @@ def __init__(

self._asynchronous = asynchronous
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.io_loop = self.loop = self._loop_runner.loop
self._connecting_to_scheduler = False

self._gather_keys = None
Expand Down Expand Up @@ -944,6 +944,38 @@ def __init__(

ReplayTaskClient(self)

@property
def io_loop(self) -> IOLoop | None:
Comment thread
gjoseph92 marked this conversation as resolved.
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
Comment thread
gjoseph92 marked this conversation as resolved.
)
return self.loop

@io_loop.setter
def io_loop(self, value: IOLoop) -> None:
warnings.warn(
"The io_loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.loop = value

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.__loop = value

@contextmanager
def as_current(self):
"""Thread-local, Task-local context manager that causes the Client.current
Expand Down Expand Up @@ -1356,7 +1388,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def __del__(self):
# If the loop never got assigned, we failed early in the constructor,
# nothing to do
if hasattr(self, "loop"):
if self.__loop is not None:
self.close()

def _inc_ref(self, key):
Expand Down
24 changes: 22 additions & 2 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from inspect import isawaitable
from typing import Any

from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop, PeriodicCallback

import dask.config
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
Expand Down Expand Up @@ -55,6 +55,7 @@ class Cluster(SyncMethodMixin):
"""

_supports_scaling = True
__loop: IOLoop | None = None

def __init__(
self,
Expand All @@ -65,7 +66,6 @@ def __init__(
scheduler_sync_interval=1,
):
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop

self.scheduler_info = {"workers": {}}
self.periodic_callbacks = {}
Expand All @@ -89,6 +89,26 @@ def __init__(
}
self.status = Status.created

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
if value is None:
raise ValueError("expected an IOLoop, got None")
self.__loop = value

@property
def name(self):
return self._cluster_info["name"]
Expand Down
17 changes: 14 additions & 3 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import TYPE_CHECKING, Any, ClassVar

from tornado import gen
from tornado.ioloop import IOLoop

import dask
from dask.utils import parse_bytes, parse_timedelta
Expand Down Expand Up @@ -230,6 +231,9 @@ def __init__(
shutdown_on_close=True,
scheduler_sync_interval=1,
):
if loop is None and asynchronous:
loop = IOLoop.current()

self._created = weakref.WeakSet()

self.scheduler_spec = copy.copy(scheduler)
Expand Down Expand Up @@ -259,7 +263,14 @@ def __init__(
scheduler_sync_interval=scheduler_sync_interval,
)

if not self.asynchronous:
try:
called_from_running_loop = (
getattr(loop, "asyncio_loop", None) is asyncio.get_running_loop()
)
except RuntimeError:
called_from_running_loop = asynchronous

if not called_from_running_loop:
self._loop_runner.start()
self.sync(self._start)
try:
Expand Down Expand Up @@ -659,7 +670,7 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny
@atexit.register
def close_clusters():
for cluster in list(SpecCluster._instances):
if cluster.shutdown_on_close:
if getattr(cluster, "shutdown_on_close", False):
with suppress(gen.TimeoutError, TimeoutError):
if cluster.status != Status.closed:
if getattr(cluster, "status", Status.closed) != Status.closed:
cluster.close(timeout=10)
1 change: 1 addition & 0 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async def test_no_more_workers_than_tasks():
assert len(cluster.scheduler.workers) <= 1


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_basic_no_loop(cleanup):
loop = None
try:
Expand Down
17 changes: 17 additions & 0 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import pytest
from tornado.ioloop import IOLoop

from distributed.deploy.cluster import Cluster
from distributed.utils_test import gen_test
Expand Down Expand Up @@ -35,3 +36,19 @@ async def test_logs_deprecated():
async with Cluster(asynchronous=True) as cluster:
with pytest.warns(FutureWarning, match="get_logs"):
cluster.logs()


@gen_test()
async def test_deprecated_loop_properties():
class ExampleCluster(Cluster):
Comment thread
gjoseph92 marked this conversation as resolved.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()
Comment thread
gjoseph92 marked this conversation as resolved.

with pytest.warns(DeprecationWarning) as warninfo:
async with ExampleCluster(asynchronous=True, loop=IOLoop.current()):
pass

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated")
]
1 change: 1 addition & 0 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def test_spec_sync(loop):
assert result == 11


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_loop_started_in_constructor(cleanup):
# test that SpecCluster.__init__ starts a loop in another thread
cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None)
Expand Down
24 changes: 22 additions & 2 deletions distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import html
import logging
import sys
import warnings
import weakref
from contextlib import suppress
from timeit import default_timer
Expand Down Expand Up @@ -111,6 +112,8 @@ def __del__(self):


class TextProgressBar(ProgressBar):
__loop: IOLoop | None = None

def __init__(
self,
keys,
Expand All @@ -122,14 +125,31 @@ def __init__(
start=True,
**kwargs,
):
self._loop_runner = loop_runner = LoopRunner(loop=loop)
super().__init__(keys, scheduler, interval, complete)
self.width = width
self.loop = loop or IOLoop()

if start:
loop_runner = LoopRunner(self.loop)
loop_runner.run_sync(self.listen)

@property
def loop(self) -> IOLoop | None:
loop = self.__loop
if loop is None:
# If the loop is not running when this is called, the LoopRunner.loop
# property will raise a DeprecationWarning
# However subsequent calls might occur - eg atexit, where a stopped
# loop is still acceptable - so we cache access to the loop.
self.__loop = loop = self._loop_runner.loop
return loop

@loop.setter
def loop(self, value: IOLoop) -> None:
warnings.warn(
"setting the loop property is deprecated", DeprecationWarning, stacklevel=2
)
self.__loop = value

def _draw_bar(self, remaining, all, **kwargs):
frac = (1 - remaining / all) if all else 1.0
bar = "#" * int(self.width * frac)
Expand Down
18 changes: 18 additions & 0 deletions distributed/diagnostics/tests/test_progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from time import sleep

import pytest
from tornado.ioloop import IOLoop

from distributed.diagnostics.progressbar import TextProgressBar, progress
from distributed.metrics import time
from distributed.utils_test import div, gen_cluster, inc
Expand Down Expand Up @@ -72,3 +75,18 @@ def test_progress_function_w_kwargs(client, capsys):

progress(f, interval="20ms", loop=client.loop)
check_bar_completed(capsys)


@gen_cluster(client=True, nthreads=[])
async def test_deprecated_loop_properties(c, s):
class ExampleTextProgressBar(TextProgressBar):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()

with pytest.warns(DeprecationWarning) as warninfo:
ExampleTextProgressBar(client=c, keys=[], start=False, loop=IOLoop.current())

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated")
]
2 changes: 1 addition & 1 deletion distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def __init__(
except ValueError:
client = get_client()
self.scheduler = scheduler_rpc or client.scheduler
self.loop = loop or client.io_loop
self.loop = loop or client.loop

self.name = name or "semaphore-" + uuid.uuid4().hex
self.max_leases = max_leases
Expand Down
28 changes: 27 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pytest
import yaml
from tlz import concat, first, identity, isdistinct, merge, pluck, valmap
from tornado.ioloop import IOLoop

import dask
import dask.bag as db
Expand Down Expand Up @@ -2873,6 +2874,7 @@ async def test_startup_close_startup(s, a, b):
await c.close()


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_startup_close_startup_sync(loop):
with cluster() as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
Expand Down Expand Up @@ -5532,13 +5534,18 @@ async def test_future_auto_inform(c, s, a, b):
await client.close()


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
def test_client_async_before_loop_starts(cleanup):
async def close():
async with client:
pass

with pristine_loop() as loop:
client = Client(asynchronous=True, loop=loop)
with pytest.warns(
DeprecationWarning,
match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated",
):
client = Client(asynchronous=True, loop=loop)
assert client.asynchronous
assert isinstance(client.close(), NoOpAwaitable)
loop.run_sync(close) # TODO: client.close() does not unset global client
Expand Down Expand Up @@ -6837,6 +6844,7 @@ async def test_workers_collection_restriction(c, s, a, b):
assert a.data and not b.data


@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
async def test_get_client_functions_spawn_clusters(c, s, a):
# see gh4565
Expand Down Expand Up @@ -7550,3 +7558,21 @@ def test_quiet_close_process(processes, tmp_path):

assert not out
assert not err


@gen_cluster(client=False, nthreads=[])
async def test_deprecated_loop_properties(s):
class ExampleClient(Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = self.io_loop = IOLoop.current()

with pytest.warns(DeprecationWarning) as warninfo:
async with ExampleClient(s.address, asynchronous=True, loop=IOLoop.current()):
pass

assert [(w.category, *w.message.args) for w in warninfo] == [
(DeprecationWarning, "setting the loop property is deprecated"),
(DeprecationWarning, "The io_loop property is deprecated"),
(DeprecationWarning, "setting the loop property is deprecated"),
]
Loading