Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from collections.abc import Generator
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, Union

from tornado.ioloop import PeriodicCallback

import dask
from dask.typing import Key
from dask.utils import parse_timedelta
Expand All @@ -22,7 +24,6 @@
# https://github.com/agronholm/sphinx-autodoc-typehints#dealing-with-circular-imports
from distributed import client
from distributed import scheduler as scheduler_module
from distributed.compatibility import PeriodicCallback
from distributed.core import Status
from distributed.metrics import time
from distributed.utils import import_term, log_errors
Expand Down
3 changes: 1 addition & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from packaging.version import parse as parse_version
from tlz import first, groupby, merge, partition_all, valmap
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask
from dask._expr import Expr, HLGExpr, LLGExpr
Expand All @@ -76,7 +76,6 @@
from distributed import versions as version_module
from distributed.batched import BatchedSend
from distributed.cfexecutor import ClientExecutor
from distributed.compatibility import PeriodicCallback
from distributed.core import (
CommClosedError,
ConnectionPool,
Expand Down
185 changes: 15 additions & 170 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import asyncio
import logging
import random
import sys
import warnings
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar

import tornado

__all__ = ["logging_names", "PeriodicCallback", "to_thread", "randbytes"]
__all__ = ["logging_names"]

logging_names: dict[str | int, int | str] = {}
logging_names.update(logging._levelToName) # type: ignore
Expand All @@ -20,172 +17,6 @@
MACOS = sys.platform == "darwin"
WINDOWS = sys.platform == "win32"


def to_thread(*args, **kwargs):
warnings.warn(
"to_thread is deprecated and will be removed in a future release; use asyncio.to_thread instead.",
DeprecationWarning,
stacklevel=2,
)
return asyncio.to_thread(*args, **kwargs)


def randbytes(*args, **kwargs):
warnings.warn(
"randbytes is deprecated and will be removed in a future release; use random.randbytes instead.",
DeprecationWarning,
stacklevel=2,
)
return random.randbytes(*args, **kwargs)


if tornado.version_info >= (6, 2, 0, 0):
from tornado.ioloop import PeriodicCallback
else:
# Backport from https://github.com/tornadoweb/tornado/blob/a4f08a31a348445094d1efa17880ed5472db9f7d/tornado/ioloop.py#L838-L962
# License https://github.com/tornadoweb/tornado/blob/v6.2.0/LICENSE
# Includes minor modifications to source code to pass linting

# This backport ensures that async callbacks are not overlapping if a run
# takes longer than the interval
import datetime
import math
from collections.abc import Awaitable
from inspect import isawaitable

from tornado.ioloop import IOLoop
from tornado.log import app_log

class PeriodicCallback: # type: ignore[no-redef]
"""Schedules the given callback to be called periodically.

The callback is called every ``callback_time`` milliseconds when
``callback_time`` is a float. Note that the timeout is given in
milliseconds, while most other time-related functions in Tornado use
seconds. ``callback_time`` may alternatively be given as a
`datetime.timedelta` object.

If ``jitter`` is specified, each callback time will be randomly selected
within a window of ``jitter * callback_time`` milliseconds.
Jitter can be used to reduce alignment of events with similar periods.
A jitter of 0.1 means allowing a 10% variation in callback time.
The window is centered on ``callback_time`` so the total number of calls
within a given interval should not be significantly affected by adding
jitter.

If the callback runs for longer than ``callback_time`` milliseconds,
subsequent invocations will be skipped to get back on schedule.

`start` must be called after the `PeriodicCallback` is created.

.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.

.. versionchanged:: 5.1
The ``jitter`` argument is added.

.. versionchanged:: 6.2
If the ``callback`` argument is a coroutine, and a callback runs for
longer than ``callback_time``, subsequent invocations will be skipped.
Previously this was only true for regular functions, not coroutines,
which were "fire-and-forget" for `PeriodicCallback`.

The ``callback_time`` argument now accepts `datetime.timedelta` objects,
in addition to the previous numeric milliseconds.
"""

def __init__(
self,
callback: Callable[[], Awaitable | None],
callback_time: datetime.timedelta | float,
jitter: float = 0,
) -> None:
self.callback = callback
if isinstance(callback_time, datetime.timedelta):
self.callback_time = callback_time / datetime.timedelta(milliseconds=1)
else:
if callback_time <= 0:
raise ValueError(
"Periodic callback must have a positive callback_time"
)
self.callback_time = callback_time
self.jitter = jitter
self._running = False
self._timeout = None # type: object

def start(self) -> None:
"""Starts the timer."""
# Looking up the IOLoop here allows to first instantiate the
# PeriodicCallback in another thread, then start it using
# IOLoop.add_callback().
self.io_loop = IOLoop.current()
self._running = True
self._next_timeout = self.io_loop.time()
self._schedule_next()

def stop(self) -> None:
"""Stops the timer."""
self._running = False
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None

def is_running(self) -> bool:
"""Returns ``True`` if this `.PeriodicCallback` has been started.

.. versionadded:: 4.1
"""
return self._running

async def _run(self) -> None:
if not self._running:
return
try:
val = self.callback()
if val is not None and isawaitable(val):
await val
except Exception:
app_log.error("Exception in callback %r", self.callback, exc_info=True)
finally:
self._schedule_next()

def _schedule_next(self) -> None:
if self._running:
self._update_next(self.io_loop.time())
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)

def _update_next(self, current_time: float) -> None:
callback_time_sec = self.callback_time / 1000.0
if self.jitter:
# apply jitter fraction
callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5))
if self._next_timeout <= current_time:
# The period should be measured from the start of one call
# to the start of the next. If one call takes too long,
# skip cycles to get back to a multiple of the original
# schedule.
self._next_timeout += (
math.floor((current_time - self._next_timeout) / callback_time_sec)
+ 1
) * callback_time_sec
else:
# If the clock moved backwards, ensure we advance the next
# timeout instead of recomputing the same value again.
# This may result in long gaps between callbacks if the
# clock jumps backwards by a lot, but the far more common
# scenario is a small NTP adjustment that should just be
# ignored.
#
# Note that on some systems if time.time() runs slower
# than time.monotonic() (most common on windows), we
# effectively experience a small backwards time jump on
# every iteration because PeriodicCallback uses
# time.time() while asyncio schedules callbacks using
# time.monotonic().
# https://github.com/tornadoweb/tornado/issues/2333
self._next_timeout += callback_time_sec


_T = TypeVar("_T")

if sys.version_info >= (3, 12):
Expand Down Expand Up @@ -265,3 +96,17 @@ def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
"task": task,
}
)


def __getattr__(name: str) -> Any:
if name == "PeriodicCallback":
warnings.warn(
"distributed.compatibility.PeriodicCallback is deprecated and will be removed "
"in a future release; use tornado.ioloop.PeriodicCallback instead.",
DeprecationWarning,
stacklevel=2,
)
from tornado.ioloop import PeriodicCallback

return PeriodicCallback
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
3 changes: 1 addition & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import tblib
from tlz import merge
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask
from dask.utils import parse_timedelta
Expand All @@ -46,7 +46,6 @@
unparse_host_port,
)
from distributed.comm.core import Listener
from distributed.compatibility import PeriodicCallback
from distributed.counter import Counter
from distributed.diskutils import WorkDir, WorkSpace
from distributed.metrics import context_meter, time
Expand Down
3 changes: 1 addition & 2 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, Literal, cast

from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask.config
from dask.utils import parse_timedelta

from distributed.compatibility import PeriodicCallback
from distributed.core import Status
from distributed.deploy.adaptive_core import AdaptiveCore
from distributed.protocol import pickle
Expand Down
3 changes: 1 addition & 2 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from typing import Any

from packaging.version import parse as parse_version
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask.config
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
from dask.widgets import get_template

from distributed.compatibility import PeriodicCallback
from distributed.core import Status
from distributed.deploy.adaptive import Adaptive
from distributed.exceptions import WorkerStartTimeoutError
Expand Down
2 changes: 1 addition & 1 deletion distributed/diagnostics/memory_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any

from distributed.compatibility import PeriodicCallback
from tornado.ioloop import PeriodicCallback

if TYPE_CHECKING:
# Optional runtime dependencies
Expand Down
3 changes: 1 addition & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
take,
valmap,
)
from tornado.ioloop import IOLoop
from tornado.ioloop import IOLoop, PeriodicCallback

import dask
from dask._expr import LLGExpr
Expand Down Expand Up @@ -96,7 +96,6 @@
unparse_host_port,
)
from distributed.comm.addressing import addresses_from_user_args
from distributed.compatibility import PeriodicCallback
from distributed.core import (
ErrorMessage,
OKMessage,
Expand Down
3 changes: 2 additions & 1 deletion distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from asyncio import TimeoutError
from collections import defaultdict, deque

from tornado.ioloop import PeriodicCallback

import dask
from dask.utils import parse_timedelta

from distributed.compatibility import PeriodicCallback
from distributed.metrics import time
from distributed.utils import Deadline, SyncMethodMixin, log_errors, wait_for
from distributed.utils_comm import retry_operation
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast

from tlz import topk
from tornado.ioloop import PeriodicCallback

import dask
from dask.typing import Key
from dask.utils import parse_timedelta

from distributed.compatibility import PeriodicCallback
from distributed.core import CommClosedError
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.utils import log_errors, recursive_to_dict
Expand Down
37 changes: 5 additions & 32 deletions distributed/tests/test_compatibility.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,9 @@
from __future__ import annotations

import random
from collections import Counter

import pytest

from distributed.compatibility import randbytes


def test_randbytes():
with pytest.warns(
DeprecationWarning,
match=r"randbytes is deprecated and will be removed in a future release; "
r"use random\.randbytes instead\.",
):
x = randbytes(256_000)
assert isinstance(x, bytes)
assert len(x) == 256_000
c = Counter(x)
for i in range(256):
assert 800 < c[i] < 1200, (i, c[i])

def test_deprecated_periodic_callback():
from tornado.ioloop import PeriodicCallback

def test_randbytes_seed():
state = random.getstate()
try:
random.seed(0)
with pytest.warns(
DeprecationWarning,
match=r"randbytes is deprecated and will be removed in a future release; "
r"use random\.randbytes instead\.",
):
assert randbytes(8) == b"\xcd\x07,\xd8\xbeo\x9fb"
finally:
random.setstate(state)
with pytest.warns(DeprecationWarning):
from distributed.compatibility import PeriodicCallback as compat
assert compat is PeriodicCallback
Loading
Loading