diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index bb918a24ff..f4df05c181 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -13,6 +13,8 @@ from collections.abc import Generator from typing import TYPE_CHECKING, Any, Literal, NamedTuple, Union +from tornado.ioloop import PeriodicCallback + import dask from dask.typing import Key from dask.utils import parse_timedelta @@ -22,7 +24,6 @@ # https://github.com/agronholm/sphinx-autodoc-typehints#dealing-with-circular-imports from distributed import client from distributed import scheduler as scheduler_module -from distributed.compatibility import PeriodicCallback from distributed.core import Status from distributed.metrics import time from distributed.utils import import_term, log_errors diff --git a/distributed/client.py b/distributed/client.py index 0d41254a04..bdbc693c29 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -50,7 +50,7 @@ from packaging.version import parse as parse_version from tlz import first, groupby, merge, partition_all, valmap from tornado import gen -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask from dask._expr import Expr, HLGExpr, LLGExpr @@ -76,7 +76,6 @@ from distributed import versions as version_module from distributed.batched import BatchedSend from distributed.cfexecutor import ClientExecutor -from distributed.compatibility import PeriodicCallback from distributed.core import ( CommClosedError, ConnectionPool, diff --git a/distributed/compatibility.py b/distributed/compatibility.py index 3ec4c9212e..dfd3005fb6 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -2,15 +2,12 @@ import asyncio import logging -import random import sys import warnings from collections.abc import Callable, Coroutine from typing import Any, TypeVar -import tornado - -__all__ = ["logging_names", "PeriodicCallback", "to_thread", "randbytes"] +__all__ = ["logging_names"] logging_names: dict[str | int, int | str] = {} logging_names.update(logging._levelToName) # type: ignore @@ -20,172 +17,6 @@ MACOS = sys.platform == "darwin" WINDOWS = sys.platform == "win32" - -def to_thread(*args, **kwargs): - warnings.warn( - "to_thread is deprecated and will be removed in a future release; use asyncio.to_thread instead.", - DeprecationWarning, - stacklevel=2, - ) - return asyncio.to_thread(*args, **kwargs) - - -def randbytes(*args, **kwargs): - warnings.warn( - "randbytes is deprecated and will be removed in a future release; use random.randbytes instead.", - DeprecationWarning, - stacklevel=2, - ) - return random.randbytes(*args, **kwargs) - - -if tornado.version_info >= (6, 2, 0, 0): - from tornado.ioloop import PeriodicCallback -else: - # Backport from https://github.com/tornadoweb/tornado/blob/a4f08a31a348445094d1efa17880ed5472db9f7d/tornado/ioloop.py#L838-L962 - # License https://github.com/tornadoweb/tornado/blob/v6.2.0/LICENSE - # Includes minor modifications to source code to pass linting - - # This backport ensures that async callbacks are not overlapping if a run - # takes longer than the interval - import datetime - import math - from collections.abc import Awaitable - from inspect import isawaitable - - from tornado.ioloop import IOLoop - from tornado.log import app_log - - class PeriodicCallback: # type: ignore[no-redef] - """Schedules the given callback to be called periodically. - - The callback is called every ``callback_time`` milliseconds when - ``callback_time`` is a float. Note that the timeout is given in - milliseconds, while most other time-related functions in Tornado use - seconds. ``callback_time`` may alternatively be given as a - `datetime.timedelta` object. - - If ``jitter`` is specified, each callback time will be randomly selected - within a window of ``jitter * callback_time`` milliseconds. - Jitter can be used to reduce alignment of events with similar periods. - A jitter of 0.1 means allowing a 10% variation in callback time. - The window is centered on ``callback_time`` so the total number of calls - within a given interval should not be significantly affected by adding - jitter. - - If the callback runs for longer than ``callback_time`` milliseconds, - subsequent invocations will be skipped to get back on schedule. - - `start` must be called after the `PeriodicCallback` is created. - - .. versionchanged:: 5.0 - The ``io_loop`` argument (deprecated since version 4.1) has been removed. - - .. versionchanged:: 5.1 - The ``jitter`` argument is added. - - .. versionchanged:: 6.2 - If the ``callback`` argument is a coroutine, and a callback runs for - longer than ``callback_time``, subsequent invocations will be skipped. - Previously this was only true for regular functions, not coroutines, - which were "fire-and-forget" for `PeriodicCallback`. - - The ``callback_time`` argument now accepts `datetime.timedelta` objects, - in addition to the previous numeric milliseconds. - """ - - def __init__( - self, - callback: Callable[[], Awaitable | None], - callback_time: datetime.timedelta | float, - jitter: float = 0, - ) -> None: - self.callback = callback - if isinstance(callback_time, datetime.timedelta): - self.callback_time = callback_time / datetime.timedelta(milliseconds=1) - else: - if callback_time <= 0: - raise ValueError( - "Periodic callback must have a positive callback_time" - ) - self.callback_time = callback_time - self.jitter = jitter - self._running = False - self._timeout = None # type: object - - def start(self) -> None: - """Starts the timer.""" - # Looking up the IOLoop here allows to first instantiate the - # PeriodicCallback in another thread, then start it using - # IOLoop.add_callback(). - self.io_loop = IOLoop.current() - self._running = True - self._next_timeout = self.io_loop.time() - self._schedule_next() - - def stop(self) -> None: - """Stops the timer.""" - self._running = False - if self._timeout is not None: - self.io_loop.remove_timeout(self._timeout) - self._timeout = None - - def is_running(self) -> bool: - """Returns ``True`` if this `.PeriodicCallback` has been started. - - .. versionadded:: 4.1 - """ - return self._running - - async def _run(self) -> None: - if not self._running: - return - try: - val = self.callback() - if val is not None and isawaitable(val): - await val - except Exception: - app_log.error("Exception in callback %r", self.callback, exc_info=True) - finally: - self._schedule_next() - - def _schedule_next(self) -> None: - if self._running: - self._update_next(self.io_loop.time()) - self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) - - def _update_next(self, current_time: float) -> None: - callback_time_sec = self.callback_time / 1000.0 - if self.jitter: - # apply jitter fraction - callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) - if self._next_timeout <= current_time: - # The period should be measured from the start of one call - # to the start of the next. If one call takes too long, - # skip cycles to get back to a multiple of the original - # schedule. - self._next_timeout += ( - math.floor((current_time - self._next_timeout) / callback_time_sec) - + 1 - ) * callback_time_sec - else: - # If the clock moved backwards, ensure we advance the next - # timeout instead of recomputing the same value again. - # This may result in long gaps between callbacks if the - # clock jumps backwards by a lot, but the far more common - # scenario is a small NTP adjustment that should just be - # ignored. - # - # Note that on some systems if time.time() runs slower - # than time.monotonic() (most common on windows), we - # effectively experience a small backwards time jump on - # every iteration because PeriodicCallback uses - # time.time() while asyncio schedules callbacks using - # time.monotonic(). - # https://github.com/tornadoweb/tornado/issues/2333 - self._next_timeout += callback_time_sec - - _T = TypeVar("_T") if sys.version_info >= (3, 12): @@ -265,3 +96,17 @@ def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None: "task": task, } ) + + +def __getattr__(name: str) -> Any: + if name == "PeriodicCallback": + warnings.warn( + "distributed.compatibility.PeriodicCallback is deprecated and will be removed " + "in a future release; use tornado.ioloop.PeriodicCallback instead.", + DeprecationWarning, + stacklevel=2, + ) + from tornado.ioloop import PeriodicCallback + + return PeriodicCallback + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/distributed/core.py b/distributed/core.py index dc5f7733fe..e9cc387486 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -29,7 +29,7 @@ import tblib from tlz import merge -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask from dask.utils import parse_timedelta @@ -46,7 +46,6 @@ unparse_host_port, ) from distributed.comm.core import Listener -from distributed.compatibility import PeriodicCallback from distributed.counter import Counter from distributed.diskutils import WorkDir, WorkSpace from distributed.metrics import context_meter, time diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 406928b1b0..0b16553945 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -6,12 +6,11 @@ from inspect import isawaitable from typing import TYPE_CHECKING, Any, Literal, cast -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask.config from dask.utils import parse_timedelta -from distributed.compatibility import PeriodicCallback from distributed.core import Status from distributed.deploy.adaptive_core import AdaptiveCore from distributed.protocol import pickle diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 7b68f67114..fe283faaa3 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -9,13 +9,12 @@ from typing import Any from packaging.version import parse as parse_version -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask.config from dask.utils import _deprecated, format_bytes, parse_timedelta, typename from dask.widgets import get_template -from distributed.compatibility import PeriodicCallback from distributed.core import Status from distributed.deploy.adaptive import Adaptive from distributed.exceptions import WorkerStartTimeoutError diff --git a/distributed/diagnostics/memory_sampler.py b/distributed/diagnostics/memory_sampler.py index 9ee5cc7726..98e6913eb3 100644 --- a/distributed/diagnostics/memory_sampler.py +++ b/distributed/diagnostics/memory_sampler.py @@ -6,7 +6,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any -from distributed.compatibility import PeriodicCallback +from tornado.ioloop import PeriodicCallback if TYPE_CHECKING: # Optional runtime dependencies diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b7c5aabacb..45f93600cf 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -59,7 +59,7 @@ take, valmap, ) -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask from dask._expr import LLGExpr @@ -96,7 +96,6 @@ unparse_host_port, ) from distributed.comm.addressing import addresses_from_user_args -from distributed.compatibility import PeriodicCallback from distributed.core import ( ErrorMessage, OKMessage, diff --git a/distributed/semaphore.py b/distributed/semaphore.py index 1a53a03758..3ede6d8b61 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -7,10 +7,11 @@ from asyncio import TimeoutError from collections import defaultdict, deque +from tornado.ioloop import PeriodicCallback + import dask from dask.utils import parse_timedelta -from distributed.compatibility import PeriodicCallback from distributed.metrics import time from distributed.utils import Deadline, SyncMethodMixin, log_errors, wait_for from distributed.utils_comm import retry_operation diff --git a/distributed/stealing.py b/distributed/stealing.py index 63b4a31f50..cc6f1e6d65 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -10,12 +10,12 @@ from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast from tlz import topk +from tornado.ioloop import PeriodicCallback import dask from dask.typing import Key from dask.utils import parse_timedelta -from distributed.compatibility import PeriodicCallback from distributed.core import CommClosedError from distributed.diagnostics.plugin import SchedulerPlugin from distributed.utils import log_errors, recursive_to_dict diff --git a/distributed/tests/test_compatibility.py b/distributed/tests/test_compatibility.py index b6368ee098..b5b9e4c5b1 100644 --- a/distributed/tests/test_compatibility.py +++ b/distributed/tests/test_compatibility.py @@ -1,36 +1,9 @@ -from __future__ import annotations - -import random -from collections import Counter - import pytest -from distributed.compatibility import randbytes - - -def test_randbytes(): - with pytest.warns( - DeprecationWarning, - match=r"randbytes is deprecated and will be removed in a future release; " - r"use random\.randbytes instead\.", - ): - x = randbytes(256_000) - assert isinstance(x, bytes) - assert len(x) == 256_000 - c = Counter(x) - for i in range(256): - assert 800 < c[i] < 1200, (i, c[i]) +def test_deprecated_periodic_callback(): + from tornado.ioloop import PeriodicCallback -def test_randbytes_seed(): - state = random.getstate() - try: - random.seed(0) - with pytest.warns( - DeprecationWarning, - match=r"randbytes is deprecated and will be removed in a future release; " - r"use random\.randbytes instead\.", - ): - assert randbytes(8) == b"\xcd\x07,\xd8\xbeo\x9fb" - finally: - random.setstate(state) + with pytest.warns(DeprecationWarning): + from distributed.compatibility import PeriodicCallback as compat + assert compat is PeriodicCallback diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e2f714e1c6..b6047c80b8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -20,7 +20,7 @@ import psutil import pytest from tlz import concat, first, merge -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask from dask import bag, delayed @@ -41,7 +41,7 @@ wait, ) from distributed.comm.addressing import parse_host_port -from distributed.compatibility import LINUX, MACOS, WINDOWS, PeriodicCallback +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import ConnectionPool, Status, clean_exception, connect, rpc from distributed.metrics import time from distributed.protocol import serialize diff --git a/distributed/worker.py b/distributed/worker.py index 79f24b22da..9a1c1ed3e8 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -42,7 +42,7 @@ ) from tlz import keymap, pluck -from tornado.ioloop import IOLoop +from tornado.ioloop import IOLoop, PeriodicCallback import dask from dask._task_spec import GraphNode @@ -64,7 +64,6 @@ from distributed.comm import Comm, connect, get_address_host, parse_address from distributed.comm import resolve_address as comm_resolve_address from distributed.comm.addressing import address_from_user_args -from distributed.compatibility import PeriodicCallback from distributed.core import ( ConnectionPool, ErrorMessage, diff --git a/distributed/worker_memory.py b/distributed/worker_memory.py index 4f0c75d499..b92b005c20 100644 --- a/distributed/worker_memory.py +++ b/distributed/worker_memory.py @@ -32,6 +32,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast import psutil +from tornado.ioloop import PeriodicCallback import dask.config from dask.system import CPU_COUNT @@ -39,7 +40,7 @@ from dask.utils import format_bytes, parse_bytes, parse_timedelta from distributed import system -from distributed.compatibility import WINDOWS, PeriodicCallback +from distributed.compatibility import WINDOWS from distributed.core import Status from distributed.gc import ThrottledGC from distributed.metrics import context_meter, monotonic