diff --git a/distributed/nanny.py b/distributed/nanny.py index 55c7838d8f3..65a2d303785 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -13,7 +13,7 @@ from inspect import isawaitable from queue import Empty from time import sleep as sync_sleep -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar, Literal import psutil from tornado import gen @@ -45,6 +45,9 @@ ) from .worker import Worker, parse_memory_limit, run +if TYPE_CHECKING: + from .diagnostics.plugin import NannyPlugin + logger = logging.getLogger(__name__) @@ -94,6 +97,7 @@ def __init__( services=None, name=None, memory_limit="auto", + memory_terminate_fraction: float | Literal[False] | None = None, reconnect=True, validate=False, quiet=False, @@ -203,8 +207,10 @@ def __init__( self.worker_kwargs = worker_kwargs self.contact_address = contact_address - self.memory_terminate_fraction = dask.config.get( - "distributed.worker.memory.terminate" + self.memory_terminate_fraction = ( + memory_terminate_fraction + if memory_terminate_fraction is not None + else dask.config.get("distributed.worker.memory.terminate") ) self.services = services @@ -231,7 +237,7 @@ def __init__( "plugin_remove": self.plugin_remove, } - self.plugins = {} + self.plugins: dict[str, NannyPlugin] = {} super().__init__( handlers=handlers, io_loop=self.loop, connection_args=self.connection_args diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0bce544f05b..975d7b9d16d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3,6 +3,7 @@ import asyncio import importlib import logging +import math import os import sys import threading @@ -35,7 +36,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -1282,6 +1283,30 @@ async def test_spill_constrained(c, s, w): assert set(w.data.disk) == {x.key} +class BadSizeof: + """Configurable actual process memory and reported managed memory""" + + # dummy *args is to facilitate Client.map + def __init__(self, *args, process: float, managed: float): + self.process = int(process) + self.managed = int(managed) + self.data = "x" * self.process + + def __sizeof__(self) -> int: + return self.managed + + def __getstate__(self): + """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. + Also remove the impact of installing lz4 or blosc, which would shrink down + self.data to kilobytes. + """ + return self.process, self.managed + + def __setstate__(self, state) -> None: + self.process, self.managed = state + self.data = "x" * self.process + + @requires_zict @gen_cluster( nthreads=[("", 1)], @@ -1306,142 +1331,119 @@ async def test_spill_spill_threshold(c, s, a): memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 - class UnderReport: - """100 MB process memory, 10 bytes reported managed memory""" - - def __init__(self, *args): - self.data = "x" * int(100e6) - - def __sizeof__(self): - return 10 - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () - - futures = c.map(UnderReport, range(8)) - + futures = c.map(BadSizeof, range(8), process=100e6, managed=10) while not a.data.disk: await asyncio.sleep(0.01) -async def assert_not_everything_is_spilled(w: Worker) -> None: - start = time() - while time() < start + 0.5: - assert w.data - if not w.data.memory: # type: ignore - # The hysteresis system fails on Windows and MacOSX because process memory - # is very slow to shrink down after calls to PyFree. As a result, - # Worker.memory_monitor will continue spilling until there's nothing left. - # Nothing we can do about this short of finding either a way to change this - # behaviour at OS level or a better measure of allocated memory. - assert not LINUX, "All data was spilled to disk" - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") - await asyncio.sleep(0) - - +@pytest.mark.slow @requires_zict -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - # FIXME https://github.com/dask/distributed/issues/5367 - # Can't reconfigure the absolute target threshold after the worker - # started, so we're setting it here to something extremely small and then - # increasing the memory_limit dynamically below in order to test the - # spill threshold. - memory_limit=1, - memory_monitor_interval="10ms", - memory_target_fraction=False, - memory_spill_fraction=0.7, - memory_pause_fraction=False, - ), +@pytest.mark.parametrize( + "memory_target_fraction,managed,min_spill,max_spill", + [ + # no target -> no hysteresis + # Over-report managed memory to test that the automated LRU eviction based on + # target is never triggered + (False, 10e9, 1, 3), + # Under-report managed memory, so that we reach the spill threshold for process + # memory without first reaching the target threshold for managed memory + # target == spill -> no hysteresis + (0.7, 1, 1, 3), + # target < spill -> hysteresis from spill to target + (0.4, 1, 4, 8), + ], ) -async def test_spill_no_target_threshold(c, s, a): - """Test that you can enable the spill threshold while leaving the target threshold - to False +@gen_cluster(nthreads=[], client=True) +async def test_spill_hysteresis( + c, s, memory_target_fraction, managed, min_spill, max_spill +): """ - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling - - class OverReport: - """Configurable process memory, 10 GB reported managed memory""" - - def __init__(self, size): - self.data = "x" * size - - def __sizeof__(self): - return int(10e9) - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return OverReport, (len(self.data),) - - f1 = c.submit(OverReport, 0, key="f1") - await wait(f1) - assert set(a.data.memory) == {"f1"} - - # 800 MB. Use large chunks to stimulate timely release of process memory. - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) - - while not a.data.disk: - await asyncio.sleep(0.01) - assert "f1" in a.data.disk - - # Spilling normally starts at the spill threshold and stops at the target threshold. - # In this special case, it stops as soon as the process memory goes below the spill - # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the - # whole data to disk (memory_limit * target = 0) - await assert_not_everything_is_spilled(a) - - -@pytest.mark.slow -@requires_zict -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - memory_limit="1 GiB", # See FIXME note in previous test + 1. Test that you can enable the spill threshold while leaving the target threshold + to False + 2. Test the hysteresis system where, once you reach the spill threshold, the worker + won't stop spilling until the target threshold is reached + """ + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + async with Nanny( + s.address, + # Start spilling after ~950 MiB managed memory + # (assuming ~100 MiB unmanaged memory) + memory_limit="1500 MiB", memory_monitor_interval="10ms", - memory_target_fraction=0.4, + memory_target_fraction=memory_target_fraction, memory_spill_fraction=0.7, memory_pause_fraction=False, - ), -) -async def test_spill_hysteresis(c, s, a): - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB - - # Under-report managed memory, so that we reach the spill threshold for process - # memory without first reaching the target threshold for managed memory - class UnderReport: - def __init__(self): - self.data = "x" * int(100e6) # 100 MB + memory_terminate_fraction=False, + ) as nanny: + + async def nspilled() -> int: + out = await c.run(lambda dask_worker: len(dask_worker.data.disk)) + return out[nanny.worker_address] + + nfuts_for_spilling = math.ceil((1500 * 0.7 - s.memory.process / 2**20) / 100) + print(f"Process memory: {s.memory.process / 2**20:.0f} MiB") + print(f"Initial load: {nfuts_for_spilling} * 100 MiB") + assert nfuts_for_spilling > 6 + + # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB + futures = [ + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) + ] + await wait(futures) + await asyncio.sleep(0.2) + assert await nspilled() == 0 + + # Add another ~800MB process memory. This should start the spilling. + futures += c.map( + BadSizeof, + range(nfuts_for_spilling - 1), + process=100 * 2**20, + managed=managed, + ) - def __sizeof__(self): - return 1 + # Wait until spilling starts + start = time() + while not await nspilled(): + # Different OSs/test environments can get stuck because of slightly + # different memory management algorithms and base unmanaged memory. Add an + # extra 100MB every 0.5s to compensate. Can't do this too fast otherwise a + # very slow CI will fail on the number of elements actually spilled later. + if time() > start + 0.5: # pragma: nocover + print("Did not spill; adding a future") + futures.append( + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) + ) + start = time() + await asyncio.sleep(0.1) - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () + # Wait until spilling stops + prev_n = -1 + while prev_n == -1 or time() < start + 0.5: + n = await nspilled() + if n == len(futures): + exc_cls = pytest.xfail if MACOS else AssertionError + raise exc_cls( + "The whole content of the SpillBuffer was spilled to disk; see " + "https://github.com/dask/distributed/issues/5840." + ) + if n != prev_n: + prev_n = n + start = time() # We just spilled; reset timer + await asyncio.sleep(0.1) - max_in_memory = 0 - futures = [] - while not a.data.disk: - futures.append(c.submit(UnderReport, pure=False)) - max_in_memory = max(max_in_memory, len(a.data.memory)) - await wait(futures) - await asyncio.sleep(0.05) - max_in_memory = max(max_in_memory, len(a.data.memory)) - - # If there were no hysteresis, we would lose exactly 1 key. - # Note that, for this test to be meaningful, memory must shrink down readily when - # we deallocate Python objects. This is not always the case on Windows and MacOSX; - # on Linux we set MALLOC_TRIM to help in that regard. - # To verify that this test is useful, set target=spill and watch it fail. - while len(a.data.memory) > max_in_memory - 3: - await asyncio.sleep(0.01) - await assert_not_everything_is_spilled(a) + assert min_spill <= await nspilled() <= max_spill @pytest.mark.slow diff --git a/distributed/worker.py b/distributed/worker.py index 3195fe38e0c..6d33c21de2d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3743,7 +3743,6 @@ def check_pause(memory): "Worker is at %.0f%% memory usage. Start spilling data to disk.", frac * 100, ) - start = time() # Implement hysteresis cycle where spilling starts at the spill threshold # and stops at the target threshold. Normally that here the target threshold # defines process memory, whereas normally it defines reported managed @@ -3768,18 +3767,14 @@ def check_pause(memory): break weight = self.data.evict() if weight == -1: - # Failed to evict: disk full, spill size limit exceeded, or pickle error + # Failed to evict: + # disk full, spill size limit exceeded, or pickle error break total += weight count += 1 - # If the current buffer is filled with a lot of small values, - # evicting one at a time is very slow and the worker might - # generate new data faster than it is able to evict. Therefore, - # only pass on control if we spent at least 0.5s evicting - if time() - start > 0.5: - await asyncio.sleep(0) - start = time() + await asyncio.sleep(0) + memory = proc.memory_info().rss if total > need and memory > target: # Issue a GC to ensure that the evicted data is actually