From 19d8befd4c5b276f8b8c2d30a3a37033fadb3cce Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:28:38 +0000 Subject: [PATCH 01/11] Fix flaky test_spill_hysteresis --- distributed/tests/test_worker.py | 162 +++++++++++++++++-------------- distributed/worker.py | 13 +-- 2 files changed, 94 insertions(+), 81 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0bce544f05b..fcaf09bec5c 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1282,6 +1282,38 @@ async def test_spill_constrained(c, s, w): assert set(w.data.disk) == {x.key} +class BadSizeof: + """Configurable actual process memory and reported managed memory""" + + # dummy *args is to facilitate Client.map + def __init__(self, *args, process: float, managed: float, pickle_time: float = 0.5): + self.process = int(process) + self.managed = int(managed) + self.pickle_time = pickle_time + self.data = "x" * self.process + + def __sizeof__(self) -> int: + return self.managed + + def __getstate__(self): + """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. + Also remove the impact of installing lz4 or blosc, which would shrink down + self.data to kilobytes. + + Make sure that the time spent writing to disk a key exceeds the time needed by + the OS kernel to react to the free() from the previous spilled key and shrink + down the process memory. + + In case of flakiness, read: https://github.com/dask/distributed/issues/5840 + """ + sleep(self.pickle_time) + return self.process, self.managed, self.pickle_time + + def __setstate__(self, state) -> None: + self.process, self.managed, self.pickle_time = state + self.data = "x" * self.process + + @requires_zict @gen_cluster( nthreads=[("", 1)], @@ -1306,44 +1338,36 @@ async def test_spill_spill_threshold(c, s, a): memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 - class UnderReport: - """100 MB process memory, 10 bytes reported managed memory""" - - def __init__(self, *args): - self.data = "x" * int(100e6) - - def __sizeof__(self): - return 10 - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () - - futures = c.map(UnderReport, range(8)) - + futures = c.map(BadSizeof, range(8), process=100e6, managed=10, pickle_time=0) while not a.data.disk: await asyncio.sleep(0.01) -async def assert_not_everything_is_spilled(w: Worker) -> None: +async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: + from distributed.spill import SpillBuffer + + assert isinstance(dask_worker.data, SpillBuffer) start = time() - while time() < start + 0.5: - assert w.data - if not w.data.memory: # type: ignore - # The hysteresis system fails on Windows and MacOSX because process memory - # is very slow to shrink down after calls to PyFree. As a result, - # Worker.memory_monitor will continue spilling until there's nothing left. - # Nothing we can do about this short of finding either a way to change this - # behaviour at OS level or a better measure of allocated memory. - assert not LINUX, "All data was spilled to disk" - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") - await asyncio.sleep(0) + nspilled = 0 + # This timeout must be longer than the 0.5s in BadSizeof.__reduce__ + while time() < start + 1.0: + assert dask_worker.data + assert dask_worker.data.memory + if len(dask_worker.data.disk) != nspilled: + nspilled = len(dask_worker.data.disk) + start = time() # We just spilled; reset timer + await asyncio.sleep(0.01) +@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, worker_kwargs=dict( # FIXME https://github.com/dask/distributed/issues/5367 # Can't reconfigure the absolute target threshold after the worker @@ -1357,42 +1381,45 @@ async def assert_not_everything_is_spilled(w: Worker) -> None: memory_pause_fraction=False, ), ) -async def test_spill_no_target_threshold(c, s, a): +async def test_spill_no_target_threshold(c, s, nanny): """Test that you can enable the spill threshold while leaving the target threshold to False """ - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling - class OverReport: - """Configurable process memory, 10 GB reported managed memory""" + def change_memory_limit(dask_worker): + memory = psutil.Process().memory_info().rss + # 300 MB before we start spilling + dask_worker.memory_limit = (memory + 300e6) / 0.7 - def __init__(self, size): - self.data = "x" * size + await c.run(change_memory_limit) - def __sizeof__(self): - return int(10e9) + async def get_keys() -> tuple[set[str], set[str]]: + def _(dask_worker): + return set(dask_worker.data.memory), set(dask_worker.data.disk) - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return OverReport, (len(self.data),) + out = await c.run(_) + return out[nanny.worker_address] - f1 = c.submit(OverReport, 0, key="f1") + f1 = c.submit(inc, 0, key="f1") await wait(f1) - assert set(a.data.memory) == {"f1"} + mem, spilled = await get_keys() + assert mem == {"f1"} + assert not spilled - # 800 MB. Use large chunks to stimulate timely release of process memory. - futures = c.map(OverReport, range(int(100e6), int(100e6) + 8)) + # 800 MB process memory, 10 GB bogous managed memory. + # Use large chunks to stimulate timely release of process memory. + futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - while not a.data.disk: + while not spilled: + mem, spilled = await get_keys() await asyncio.sleep(0.01) - assert "f1" in a.data.disk + assert "f1" in spilled # Spilling normally starts at the spill threshold and stops at the target threshold. # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - await assert_not_everything_is_spilled(a) + await c.run(assert_not_everything_is_spilled) @pytest.mark.slow @@ -1400,48 +1427,39 @@ def __reduce__(self): @gen_cluster( nthreads=[("", 1)], client=True, + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, worker_kwargs=dict( - memory_limit="1 GiB", # See FIXME note in previous test + memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", memory_target_fraction=0.4, memory_spill_fraction=0.7, memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, a): - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB +async def test_spill_hysteresis(c, s, nanny): + def change_memory_limit(dask_worker): + memory = psutil.Process().memory_info().rss + dask_worker.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB + + await c.run(change_memory_limit) # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory - class UnderReport: - def __init__(self): - self.data = "x" * int(100e6) # 100 MB - - def __sizeof__(self): - return 1 - - def __reduce__(self): - """Speed up test by writing very little to disk when spilling""" - return UnderReport, () + futures = c.map(BadSizeof, range(11), process=100e6, managed=1) - max_in_memory = 0 - futures = [] - while not a.data.disk: - futures.append(c.submit(UnderReport, pure=False)) - max_in_memory = max(max_in_memory, len(a.data.memory)) - await wait(futures) - await asyncio.sleep(0.05) - max_in_memory = max(max_in_memory, len(a.data.memory)) + # Wait until spilling stops, but no less than 1s + await c.run(assert_not_everything_is_spilled) - # If there were no hysteresis, we would lose exactly 1 key. + # If there were no hysteresis, we would lose 1-2 keys. # Note that, for this test to be meaningful, memory must shrink down readily when # we deallocate Python objects. This is not always the case on Windows and MacOSX; # on Linux we set MALLOC_TRIM to help in that regard. # To verify that this test is useful, set target=spill and watch it fail. - while len(a.data.memory) > max_in_memory - 3: - await asyncio.sleep(0.01) - await assert_not_everything_is_spilled(a) + nspilled = await c.run(lambda dask_worker: len(dask_worker.data.disk)) + assert 3 < nspilled[nanny.worker_address] < 11 @pytest.mark.slow diff --git a/distributed/worker.py b/distributed/worker.py index 3f5761f7b9d..3b8738647f5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3743,7 +3743,6 @@ def check_pause(memory): "Worker is at %.0f%% memory usage. Start spilling data to disk.", frac * 100, ) - start = time() # Implement hysteresis cycle where spilling starts at the spill threshold # and stops at the target threshold. Normally that here the target threshold # defines process memory, whereas normally it defines reported managed @@ -3768,18 +3767,14 @@ def check_pause(memory): break weight = self.data.evict() if weight == -1: - # Failed to evict: disk full, spill size limit exceeded, or pickle error + # Failed to evict: + # disk full, spill size limit exceeded, or pickle error break total += weight count += 1 - # If the current buffer is filled with a lot of small values, - # evicting one at a time is very slow and the worker might - # generate new data faster than it is able to evict. Therefore, - # only pass on control if we spent at least 0.5s evicting - if time() - start > 0.5: - await asyncio.sleep(0) - start = time() + await asyncio.sleep(0) + memory = proc.memory_info().rss if total > need and memory > target: # Issue a GC to ensure that the evicted data is actually From 8f2ffa2fabacad8d0013e0b53112f2a39ee770c3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:37:14 +0000 Subject: [PATCH 02/11] stress test --- .github/workflows/tests.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e087f99757..d1f8431977b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -129,8 +129,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed \ - -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ + pytest distributed/tests/test_worker.py \ + -m "not avoid_ci" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ From bddd7c3fa3489f765d91be4efd34dcd10027baea Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 22 Feb 2022 16:56:47 +0000 Subject: [PATCH 03/11] xfail --- distributed/tests/test_worker.py | 82 ++++++++++---------------------- 1 file changed, 25 insertions(+), 57 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index fcaf09bec5c..8734cbe4395 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1286,10 +1286,9 @@ class BadSizeof: """Configurable actual process memory and reported managed memory""" # dummy *args is to facilitate Client.map - def __init__(self, *args, process: float, managed: float, pickle_time: float = 0.5): + def __init__(self, *args, process: float, managed: float): self.process = int(process) self.managed = int(managed) - self.pickle_time = pickle_time self.data = "x" * self.process def __sizeof__(self) -> int: @@ -1299,18 +1298,11 @@ def __getstate__(self): """Do not rely on actual disk speed, which is sluggish and unpredictable on CI. Also remove the impact of installing lz4 or blosc, which would shrink down self.data to kilobytes. - - Make sure that the time spent writing to disk a key exceeds the time needed by - the OS kernel to react to the free() from the previous spilled key and shrink - down the process memory. - - In case of flakiness, read: https://github.com/dask/distributed/issues/5840 """ - sleep(self.pickle_time) - return self.process, self.managed, self.pickle_time + return self.process, self.managed def __setstate__(self, state) -> None: - self.process, self.managed, self.pickle_time = state + self.process, self.managed = state self.data = "x" * self.process @@ -1338,7 +1330,7 @@ async def test_spill_spill_threshold(c, s, a): memory = psutil.Process().memory_info().rss a.memory_limit = (memory + 300e6) / 0.7 - futures = c.map(BadSizeof, range(8), process=100e6, managed=10, pickle_time=0) + futures = c.map(BadSizeof, range(8), process=100e6, managed=10) while not a.data.disk: await asyncio.sleep(0.01) @@ -1349,25 +1341,20 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: assert isinstance(dask_worker.data, SpillBuffer) start = time() nspilled = 0 - # This timeout must be longer than the 0.5s in BadSizeof.__reduce__ - while time() < start + 1.0: + while time() < start + 0.5: assert dask_worker.data - assert dask_worker.data.memory + if not dask_worker.data.memory: + raise pytest.xfail("https://github.com/dask/distributed/issues/5840") if len(dask_worker.data.disk) != nspilled: nspilled = len(dask_worker.data.disk) start = time() # We just spilled; reset timer await asyncio.sleep(0.01) -@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, worker_kwargs=dict( # FIXME https://github.com/dask/distributed/issues/5367 # Can't reconfigure the absolute target threshold after the worker @@ -1381,56 +1368,38 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: memory_pause_fraction=False, ), ) -async def test_spill_no_target_threshold(c, s, nanny): +async def test_spill_no_target_threshold(c, s, a): """Test that you can enable the spill threshold while leaving the target threshold to False """ - - def change_memory_limit(dask_worker): - memory = psutil.Process().memory_info().rss - # 300 MB before we start spilling - dask_worker.memory_limit = (memory + 300e6) / 0.7 - - await c.run(change_memory_limit) - - async def get_keys() -> tuple[set[str], set[str]]: - def _(dask_worker): - return set(dask_worker.data.memory), set(dask_worker.data.disk) - - out = await c.run(_) - return out[nanny.worker_address] + memory = psutil.Process().memory_info().rss + # 300 MB before we start spilling + a.memory_limit = (memory + 300e6) / 0.7 f1 = c.submit(inc, 0, key="f1") await wait(f1) - mem, spilled = await get_keys() - assert mem == {"f1"} - assert not spilled + assert set(a.data.memory) == {"f1"} + assert not a.data.disk # 800 MB process memory, 10 GB bogous managed memory. # Use large chunks to stimulate timely release of process memory. futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - while not spilled: - mem, spilled = await get_keys() + while not a.data.disk: await asyncio.sleep(0.01) - assert "f1" in spilled + assert "f1" in a.data.disk # Spilling normally starts at the spill threshold and stops at the target threshold. # In this special case, it stops as soon as the process memory goes below the spill # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the # whole data to disk (memory_limit * target = 0) - await c.run(assert_not_everything_is_spilled) + await assert_not_everything_is_spilled(a) -@pytest.mark.slow @requires_zict @gen_cluster( nthreads=[("", 1)], client=True, - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, worker_kwargs=dict( memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", @@ -1439,27 +1408,26 @@ def _(dask_worker): memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, nanny): - def change_memory_limit(dask_worker): - memory = psutil.Process().memory_info().rss - dask_worker.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB - - await c.run(change_memory_limit) +async def test_spill_hysteresis(c, s, a): + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory futures = c.map(BadSizeof, range(11), process=100e6, managed=1) - # Wait until spilling stops, but no less than 1s - await c.run(assert_not_everything_is_spilled) + while not a.data.disk: + await asyncio.sleep(0.01) + + # Wait until spilling stops, but no less than 0.5s + await assert_not_everything_is_spilled(a) # If there were no hysteresis, we would lose 1-2 keys. # Note that, for this test to be meaningful, memory must shrink down readily when # we deallocate Python objects. This is not always the case on Windows and MacOSX; # on Linux we set MALLOC_TRIM to help in that regard. # To verify that this test is useful, set target=spill and watch it fail. - nspilled = await c.run(lambda dask_worker: len(dask_worker.data.disk)) - assert 3 < nspilled[nanny.worker_address] < 11 + assert 3 < len(a.data.disk) < 11 @pytest.mark.slow From df14e243d407de24c9cd3d736ad7882c23733d6f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 13:28:04 +0000 Subject: [PATCH 04/11] test resilience --- distributed/tests/test_worker.py | 42 ++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8734cbe4395..81c53dcd170 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1344,7 +1344,10 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: while time() < start + 0.5: assert dask_worker.data if not dask_worker.data.memory: - raise pytest.xfail("https://github.com/dask/distributed/issues/5840") + raise pytest.xfail( + "The whole contents of the SpillBuffer was spilled to disk: " + "https://github.com/dask/distributed/issues/5840" + ) if len(dask_worker.data.disk) != nspilled: nspilled = len(dask_worker.data.disk) start = time() # We just spilled; reset timer @@ -1361,7 +1364,7 @@ async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: # started, so we're setting it here to something extremely small and then # increasing the memory_limit dynamically below in order to test the # spill threshold. - memory_limit=1, + memory_limit="1 kb", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=0.7, @@ -1381,7 +1384,7 @@ async def test_spill_no_target_threshold(c, s, a): assert set(a.data.memory) == {"f1"} assert not a.data.disk - # 800 MB process memory, 10 GB bogous managed memory. + # 800 MB process memory, 80 GB bogus managed memory. # Use large chunks to stimulate timely release of process memory. futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) @@ -1397,37 +1400,50 @@ async def test_spill_no_target_threshold(c, s, a): @requires_zict +@pytest.mark.parametrize( + "memory_target_fraction,min_spill,max_spill", + [ + (0.7, 1, 3), # target=spill -> no hysteresis + (0.4, 4, 8), # target hysteresis + ], +) @gen_cluster( nthreads=[("", 1)], client=True, worker_kwargs=dict( memory_limit="10 GiB", # See FIXME note in previous test memory_monitor_interval="10ms", - memory_target_fraction=0.4, memory_spill_fraction=0.7, memory_pause_fraction=False, ), ) -async def test_spill_hysteresis(c, s, a): +async def test_spill_hysteresis(c, s, a, memory_target_fraction, min_spill, max_spill): memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB + a.memory_limit = (memory + 950e6) / 0.7 # Start spilling after 950MB + a.memory_target_fraction = memory_target_fraction # Under-report managed memory, so that we reach the spill threshold for process # memory without first reaching the target threshold for managed memory - futures = c.map(BadSizeof, range(11), process=100e6, managed=1) + futures = c.map(BadSizeof, range(10), process=100e6, managed=1) + start = time() while not a.data.disk: + # Different OSs/test environments can get stuck because of slightly different + # memory management algorithms. Add an extra 100MB every 2s to compensate. + # Can't do this too fast otherwise a very slow CI will fail on the number of + # elements actually spilled later. + if time() > start + 2: # pragma: nocover + print("Did not spill; adding a future") + futures.append(c.submit(BadSizeof, len(futures), process=100e6, managed=1)) + start = time() await asyncio.sleep(0.01) # Wait until spilling stops, but no less than 0.5s await assert_not_everything_is_spilled(a) - # If there were no hysteresis, we would lose 1-2 keys. - # Note that, for this test to be meaningful, memory must shrink down readily when - # we deallocate Python objects. This is not always the case on Windows and MacOSX; - # on Linux we set MALLOC_TRIM to help in that regard. - # To verify that this test is useful, set target=spill and watch it fail. - assert 3 < len(a.data.disk) < 11 + # Run this test twice, once with no hysteresis and another with hysteresis to show + # the different behaviour. + assert min_spill <= len(a.data.disk) <= max_spill @pytest.mark.slow From ad8803ecc8cd413c2eb667d8cb382e3c82bcc1c6 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 15:25:59 +0000 Subject: [PATCH 05/11] redesign with nannies --- distributed/nanny.py | 14 ++- distributed/tests/test_worker.py | 185 +++++++++++++++---------------- 2 files changed, 97 insertions(+), 102 deletions(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 55c7838d8f3..65a2d303785 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -13,7 +13,7 @@ from inspect import isawaitable from queue import Empty from time import sleep as sync_sleep -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar, Literal import psutil from tornado import gen @@ -45,6 +45,9 @@ ) from .worker import Worker, parse_memory_limit, run +if TYPE_CHECKING: + from .diagnostics.plugin import NannyPlugin + logger = logging.getLogger(__name__) @@ -94,6 +97,7 @@ def __init__( services=None, name=None, memory_limit="auto", + memory_terminate_fraction: float | Literal[False] | None = None, reconnect=True, validate=False, quiet=False, @@ -203,8 +207,10 @@ def __init__( self.worker_kwargs = worker_kwargs self.contact_address = contact_address - self.memory_terminate_fraction = dask.config.get( - "distributed.worker.memory.terminate" + self.memory_terminate_fraction = ( + memory_terminate_fraction + if memory_terminate_fraction is not None + else dask.config.get("distributed.worker.memory.terminate") ) self.services = services @@ -231,7 +237,7 @@ def __init__( "plugin_remove": self.plugin_remove, } - self.plugins = {} + self.plugins: dict[str, NannyPlugin] = {} super().__init__( handlers=handlers, io_loop=self.loop, connection_args=self.connection_args diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 81c53dcd170..a6e193f0ad8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3,6 +3,7 @@ import asyncio import importlib import logging +import math import os import sys import threading @@ -1335,115 +1336,103 @@ async def test_spill_spill_threshold(c, s, a): await asyncio.sleep(0.01) -async def assert_not_everything_is_spilled(dask_worker: Worker) -> None: - from distributed.spill import SpillBuffer - - assert isinstance(dask_worker.data, SpillBuffer) - start = time() - nspilled = 0 - while time() < start + 0.5: - assert dask_worker.data - if not dask_worker.data.memory: - raise pytest.xfail( - "The whole contents of the SpillBuffer was spilled to disk: " - "https://github.com/dask/distributed/issues/5840" - ) - if len(dask_worker.data.disk) != nspilled: - nspilled = len(dask_worker.data.disk) - start = time() # We just spilled; reset timer - await asyncio.sleep(0.01) - - -@requires_zict -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - # FIXME https://github.com/dask/distributed/issues/5367 - # Can't reconfigure the absolute target threshold after the worker - # started, so we're setting it here to something extremely small and then - # increasing the memory_limit dynamically below in order to test the - # spill threshold. - memory_limit="1 kb", - memory_monitor_interval="10ms", - memory_target_fraction=False, - memory_spill_fraction=0.7, - memory_pause_fraction=False, - ), -) -async def test_spill_no_target_threshold(c, s, a): - """Test that you can enable the spill threshold while leaving the target threshold - to False - """ - memory = psutil.Process().memory_info().rss - # 300 MB before we start spilling - a.memory_limit = (memory + 300e6) / 0.7 - - f1 = c.submit(inc, 0, key="f1") - await wait(f1) - assert set(a.data.memory) == {"f1"} - assert not a.data.disk - - # 800 MB process memory, 80 GB bogus managed memory. - # Use large chunks to stimulate timely release of process memory. - futures = c.map(BadSizeof, range(8), process=100e6, managed=10e9) - - while not a.data.disk: - await asyncio.sleep(0.01) - assert "f1" in a.data.disk - - # Spilling normally starts at the spill threshold and stops at the target threshold. - # In this special case, it stops as soon as the process memory goes below the spill - # threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the - # whole data to disk (memory_limit * target = 0) - await assert_not_everything_is_spilled(a) - - +@pytest.mark.slow @requires_zict @pytest.mark.parametrize( - "memory_target_fraction,min_spill,max_spill", + "memory_target_fraction,managed,min_spill,max_spill", [ - (0.7, 1, 3), # target=spill -> no hysteresis - (0.4, 4, 8), # target hysteresis + # no target -> no hysteresis + # Over-report managed memory to test that the automated LRU eviction based on + # target is never triggered + (False, 10e9, 1, 3), + # Under-report managed memory, so that we reach the spill threshold for process + # memory without first reaching the target threshold for managed memory + # target == spill -> no hysteresis + (0.7, 1, 1, 3), + # target < spill -> hysteresis from spill to target + (0.4, 1, 4, 8), ], ) -@gen_cluster( - nthreads=[("", 1)], - client=True, - worker_kwargs=dict( - memory_limit="10 GiB", # See FIXME note in previous test +@gen_cluster(nthreads=[], client=True) +async def test_spill_hysteresis( + c, s, memory_target_fraction, managed, min_spill, max_spill +): + """ + 1. Test that you can enable the spill threshold while leaving the target threshold + to False + 2. Test the hysteresis system where, once you reach the spill threshold, the worker + won't stop spilling until the target threshold is reached + """ + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + async with Nanny( + s.address, + # Start spilling after ~950 MiB managed memory + # (assuming ~100 MiB unmanaged memory) + memory_limit="1500 MiB", memory_monitor_interval="10ms", + memory_target_fraction=memory_target_fraction, memory_spill_fraction=0.7, memory_pause_fraction=False, - ), -) -async def test_spill_hysteresis(c, s, a, memory_target_fraction, min_spill, max_spill): - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 950e6) / 0.7 # Start spilling after 950MB - a.memory_target_fraction = memory_target_fraction - - # Under-report managed memory, so that we reach the spill threshold for process - # memory without first reaching the target threshold for managed memory - futures = c.map(BadSizeof, range(10), process=100e6, managed=1) + memory_terminate_fraction=False, + ) as nanny: + + async def nspilled() -> int: + out = await c.run(lambda dask_worker: len(dask_worker.data.disk)) + return out[nanny.worker_address] + + nfuts_for_spilling = math.ceil((1500 * 0.7 - s.memory.process / 2**20) / 100) + print(f"Process memory: {s.memory.process / 2**20:.0f} MiB") + print(f"Initial load: {nfuts_for_spilling} * 100 MiB") + assert nfuts_for_spilling > 6 + + # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB + futures = [ + c.submit(BadSizeof, process=100 * 2**20, managed=managed, pure=False) + ] + await wait(futures) + await asyncio.sleep(0.2) + assert await nspilled() == 0 + + # Add another ~800MB process memory. This should start the spilling. + futures += c.map( + BadSizeof, + range(nfuts_for_spilling - 1), + process=100 * 2**20, + managed=managed, + ) - start = time() - while not a.data.disk: - # Different OSs/test environments can get stuck because of slightly different - # memory management algorithms. Add an extra 100MB every 2s to compensate. - # Can't do this too fast otherwise a very slow CI will fail on the number of - # elements actually spilled later. - if time() > start + 2: # pragma: nocover - print("Did not spill; adding a future") - futures.append(c.submit(BadSizeof, len(futures), process=100e6, managed=1)) - start = time() - await asyncio.sleep(0.01) + # Wait until spilling starts + start = time() + while not await nspilled(): + # Different OSs/test environments can get stuck because of slightly + # different memory management algorithms and base unmanaged memory. Add an + # extra 100MB every 0.5s to compensate. Can't do this too fast otherwise a + # very slow CI will fail on the number of elements actually spilled later. + if time() > start + 0.5: # pragma: nocover + print("Did not spill; adding a future") + futures.append( + c.submit(BadSizeof, process=100e6, managed=managed, pure=False) + ) + start = time() + await asyncio.sleep(0.1) - # Wait until spilling stops, but no less than 0.5s - await assert_not_everything_is_spilled(a) + # Wait until spilling stops + prev_n = -1 + while prev_n == -1 or time() < start + 0.5: + n = await nspilled() + if n == len(futures): + raise pytest.xfail( + "The whole contents of the SpillBuffer was spilled to disk; see " + "https://github.com/dask/distributed/issues/5840" + ) + if n != prev_n: + prev_n = n + start = time() # We just spilled; reset timer + await asyncio.sleep(0.1) - # Run this test twice, once with no hysteresis and another with hysteresis to show - # the different behaviour. - assert min_spill <= len(a.data.disk) <= max_spill + assert min_spill <= await nspilled() <= max_spill @pytest.mark.slow From 34a790a863fa6e782c30f9a7842f2493c074a9e5 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 15:47:24 +0000 Subject: [PATCH 06/11] Stress test --- .github/workflows/tests.yaml | 12 ++---------- distributed/tests/test_worker.py | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d1f8431977b..bbddd525e4f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,13 +24,6 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9"] - # Cherry-pick test modules to split the overall runtime roughly in half - partition: [ci1, not ci1] - include: - - partition: "ci1" - partition-label: "ci1" - - partition: "not ci1" - partition-label: "notci1" # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. @@ -38,11 +31,10 @@ jobs: # To avoid hamstringing other people, change 'on: [push, pull_request]' above # to just 'on: [push]'; this way the stress test will run exclusively in your # branch (https://github.com//distributed/actions). - # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} - # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.run }} steps: - name: Checkout source diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a6e193f0ad8..a4e0deed308 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1389,7 +1389,12 @@ async def nspilled() -> int: # Add 100 MiB process memory. Spilling must not happen, even when managed=10GB futures = [ - c.submit(BadSizeof, process=100 * 2**20, managed=managed, pure=False) + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) ] await wait(futures) await asyncio.sleep(0.2) @@ -1413,7 +1418,12 @@ async def nspilled() -> int: if time() > start + 0.5: # pragma: nocover print("Did not spill; adding a future") futures.append( - c.submit(BadSizeof, process=100e6, managed=managed, pure=False) + c.submit( + BadSizeof, + process=100 * 2**20, + managed=managed, + pure=False, + ) ) start = time() await asyncio.sleep(0.1) @@ -1423,9 +1433,11 @@ async def nspilled() -> int: while prev_n == -1 or time() < start + 0.5: n = await nspilled() if n == len(futures): - raise pytest.xfail( - "The whole contents of the SpillBuffer was spilled to disk; see " - "https://github.com/dask/distributed/issues/5840" + # raise pytest.xfail( + raise AssertionError( + "The whole content of the SpillBuffer was spilled to disk; see " + "https://github.com/dask/distributed/issues/5840. " + "Consider converting this to xfail" ) if n != prev_n: prev_n = n From cb1f369c38785a5951a152bb32dd6430934f01c3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 16:23:23 +0000 Subject: [PATCH 07/11] xfail on MacOS --- distributed/tests/test_worker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a4e0deed308..975d7b9d16d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -36,7 +36,7 @@ wait, ) from distributed.comm.registry import backends -from distributed.compatibility import LINUX, WINDOWS +from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall @@ -1433,11 +1433,10 @@ async def nspilled() -> int: while prev_n == -1 or time() < start + 0.5: n = await nspilled() if n == len(futures): - # raise pytest.xfail( - raise AssertionError( + exc_cls = pytest.xfail if MACOS else AssertionError + raise exc_cls( "The whole content of the SpillBuffer was spilled to disk; see " - "https://github.com/dask/distributed/issues/5840. " - "Consider converting this to xfail" + "https://github.com/dask/distributed/issues/5840." ) if n != prev_n: prev_n = n From ab561c229887a4ff04713b74e2bd0bf059ff9d7d Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 16:23:49 +0000 Subject: [PATCH 08/11] don't saturate dask org CI --- .github/workflows/tests.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index bbddd525e4f..7a47b1db825 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -2,9 +2,6 @@ name: Tests on: push: - pull_request: - schedule: - - cron: "0 6,18 * * *" # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch From ef0e44b905dd787e0feb13fa9847d9a27c133cb8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 17:08:36 +0000 Subject: [PATCH 09/11] improve resilience of test_pause_executor --- distributed/tests/test_worker.py | 47 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 975d7b9d16d..5685b3846b9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1449,44 +1449,47 @@ async def nspilled() -> int: @pytest.mark.slow @gen_cluster( nthreads=[("", 1)], + # Run the test in a freshly spawned interpreter to ensure a clear memory situation, + # as opposed to the potentially heavily fragmented and unpredictable condition of + # the process used to run all the tests so far + Worker=Nanny, client=True, worker_kwargs=dict( + # pause after ~300 MiB, assuming ~100 MiB unmanaged memory + memory_limit="500 MiB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=False, memory_pause_fraction=0.8, + memory_terminate_fraction=False, ), ) -async def test_pause_executor(c, s, a): - # See notes in test_spill_spill_threshold - memory = psutil.Process().memory_info().rss - a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB +async def test_pause_executor(c, s, nanny): + ws = s.workers[nanny.worker_address] - # Note: it's crucial to have a very large single chunk of memory that gets descoped - # all at once in order to instigate release of process memory. - # Read: https://github.com/dask/distributed/issues/5840 def f(): - # Add 400 MB unmanaged memory - x = "x" * int(400e6) + # Add 1 GiB unmanaged memory + # Note: it's crucial to have a very large single chunk of memory that gets + # descoped all at once in order to instigate release of process memory. Read: + # https://github.com/dask/distributed/issues/5840 + x = "x" * 2**30 w = get_worker() while w.status != Status.paused: sleep(0.01) - with captured_logger(logging.getLogger("distributed.worker")) as logger: - future = c.submit(f, key="x") - futures = c.map(slowinc, range(30), delay=0.1) - - while a.status != Status.paused: - await asyncio.sleep(0.01) - - assert "Pausing worker" in logger.getvalue() - assert sum(f.status == "finished" for f in futures) < 4 + assert ws.status == Status.running + x = c.submit(f, key="x") + while "x" not in s.tasks: + await asyncio.sleep(0.01) + futures = c.map(slowinc, range(8), delay=0.1) - while a.status != Status.running: - await asyncio.sleep(0.01) + while ws.status != Status.paused: + await asyncio.sleep(0.01) - assert "Resuming worker" in logger.getvalue() - await wait(futures) + assert sum(f.status == "finished" for f in futures) < 4 + # Wait for unpause + await wait(futures) + assert ws.status == Status.running @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"}) From e484378a46a8be2fa842edf4106e9e8f151a277e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 23 Feb 2022 17:08:36 +0000 Subject: [PATCH 10/11] Revert "improve resilience of test_pause_executor" This reverts commit ef0e44b905dd787e0feb13fa9847d9a27c133cb8. --- distributed/tests/test_worker.py | 47 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5685b3846b9..975d7b9d16d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1449,47 +1449,44 @@ async def nspilled() -> int: @pytest.mark.slow @gen_cluster( nthreads=[("", 1)], - # Run the test in a freshly spawned interpreter to ensure a clear memory situation, - # as opposed to the potentially heavily fragmented and unpredictable condition of - # the process used to run all the tests so far - Worker=Nanny, client=True, worker_kwargs=dict( - # pause after ~300 MiB, assuming ~100 MiB unmanaged memory - memory_limit="500 MiB", memory_monitor_interval="10ms", memory_target_fraction=False, memory_spill_fraction=False, memory_pause_fraction=0.8, - memory_terminate_fraction=False, ), ) -async def test_pause_executor(c, s, nanny): - ws = s.workers[nanny.worker_address] +async def test_pause_executor(c, s, a): + # See notes in test_spill_spill_threshold + memory = psutil.Process().memory_info().rss + a.memory_limit = (memory + 160e6) / 0.8 # Pause after 200 MB + # Note: it's crucial to have a very large single chunk of memory that gets descoped + # all at once in order to instigate release of process memory. + # Read: https://github.com/dask/distributed/issues/5840 def f(): - # Add 1 GiB unmanaged memory - # Note: it's crucial to have a very large single chunk of memory that gets - # descoped all at once in order to instigate release of process memory. Read: - # https://github.com/dask/distributed/issues/5840 - x = "x" * 2**30 + # Add 400 MB unmanaged memory + x = "x" * int(400e6) w = get_worker() while w.status != Status.paused: sleep(0.01) - assert ws.status == Status.running - x = c.submit(f, key="x") - while "x" not in s.tasks: - await asyncio.sleep(0.01) - futures = c.map(slowinc, range(8), delay=0.1) + with captured_logger(logging.getLogger("distributed.worker")) as logger: + future = c.submit(f, key="x") + futures = c.map(slowinc, range(30), delay=0.1) - while ws.status != Status.paused: - await asyncio.sleep(0.01) + while a.status != Status.paused: + await asyncio.sleep(0.01) - assert sum(f.status == "finished" for f in futures) < 4 - # Wait for unpause - await wait(futures) - assert ws.status == Status.running + assert "Pausing worker" in logger.getvalue() + assert sum(f.status == "finished" for f in futures) < 4 + + while a.status != Status.running: + await asyncio.sleep(0.01) + + assert "Resuming worker" in logger.getvalue() + await wait(futures) @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"}) From 8152265602a2a7cce5d076e78eaa6df81e7609b7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 24 Feb 2022 11:27:41 +0000 Subject: [PATCH 11/11] Revert stress test --- .github/workflows/tests.yaml | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7a47b1db825..7e087f99757 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -2,6 +2,9 @@ name: Tests on: push: + pull_request: + schedule: + - cron: "0 6,18 * * *" # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch @@ -21,6 +24,13 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9"] + # Cherry-pick test modules to split the overall runtime roughly in half + partition: [ci1, not ci1] + include: + - partition: "ci1" + partition-label: "ci1" + - partition: "not ci1" + partition-label: "notci1" # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. @@ -28,10 +38,11 @@ jobs: # To avoid hamstringing other people, change 'on: [push, pull_request]' above # to just 'on: [push]'; this way the stress test will run exclusively in your # branch (https://github.com//distributed/actions). - run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} + # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} steps: - name: Checkout source @@ -118,8 +129,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed/tests/test_worker.py \ - -m "not avoid_ci" --runslow \ + pytest distributed \ + -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \