From 6b588386b632c2599e491d02c2a456d608dc5a02 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 1 Mar 2022 15:17:26 +0100 Subject: [PATCH 1/4] Ensure event loop does not block while Worker is shutting down --- distributed/cli/tests/test_dask_worker.py | 26 +---------- distributed/compatibility.py | 25 +++++++++++ distributed/tests/test_worker.py | 55 ++++++++++++++++++++++- distributed/worker.py | 20 ++++++--- 4 files changed, 95 insertions(+), 31 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 449b406c928..04e31847609 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -1,7 +1,4 @@ import asyncio -import contextvars -import functools -import sys import pytest from click.testing import CliRunner @@ -18,33 +15,12 @@ import distributed.cli.dask_worker from distributed import Client -from distributed.compatibility import LINUX +from distributed.compatibility import LINUX, to_thread from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import parse_ports, sync from distributed.utils_test import gen_cluster, popen, requires_ipv6 -if sys.version_info >= (3, 9): - from asyncio import to_thread -else: - - async def to_thread(*func_args, **kwargs): - """Asynchronously run function *func* in a separate thread. - Any *args and **kwargs supplied for this function are directly passed - to *func*. Also, the current :class:`contextvars.Context` is propagated, - allowing context variables from the main thread to be accessed in the - separate thread. - Return a coroutine that can be awaited to get the eventual result of *func*. - - backport from - https://github.com/python/cpython/blob/3f1ea163ea54513e00e0e9d5442fee1b639825cc/Lib/asyncio/threads.py#L12-L25 - """ - func, *args = func_args - loop = asyncio.get_running_loop() - ctx = contextvars.copy_context() - func_call = functools.partial(ctx.run, func, *args, **kwargs) - return await loop.run_in_executor(None, func_call) - def test_nanny_worker_ports(loop): with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): diff --git a/distributed/compatibility.py b/distributed/compatibility.py index 352667ec2bd..c2097b8ab77 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -1,5 +1,8 @@ from __future__ import annotations +import asyncio +import contextvars +import functools import logging import platform import sys @@ -12,3 +15,25 @@ LINUX = sys.platform == "linux" MACOS = sys.platform == "darwin" WINDOWS = sys.platform.startswith("win") + + +if sys.version_info >= (3, 9): + from asyncio import to_thread +else: + + async def to_thread(*func_args, **kwargs): + """Asynchronously run function *func* in a separate thread. + Any *args and **kwargs supplied for this function are directly passed + to *func*. Also, the current :class:`contextvars.Context` is propagated, + allowing context variables from the main thread to be accessed in the + separate thread. + Return a coroutine that can be awaited to get the eventual result of *func*. + + backport from + https://github.com/python/cpython/blob/3f1ea163ea54513e00e0e9d5442fee1b639825cc/Lib/asyncio/threads.py#L12-L25 + """ + func, *args = func_args + loop = asyncio.get_running_loop() + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2cb36819713..40628e29734 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -37,7 +37,7 @@ ) from distributed.comm.registry import backends from distributed.compatibility import LINUX, WINDOWS -from distributed.core import CommClosedError, Status, rpc +from distributed.core import CommClosedError, ConnectionPool, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time @@ -3805,3 +3805,56 @@ def test_unique_task_heap(): assert heap.pop() == ts assert repr(heap) == "" + + +@gen_test( + # The timeout value here must be strictly larger than the default value for + # the timeout in `Worker.close`. The entire test should finish in <1s if + # everything works as intended but we want to make sure to not obfuscate + # another timeout therefore we increase this number. + timeout=60, +) +async def test_do_not_block_event_loop_during_shutdown(): + async with Scheduler() as s: + async with Client(s.address, asynchronous=True) as c: + block_handler = asyncio.Event() + block_handler.clear() + called_handler = asyncio.Event() + called_handler.clear() + timeout_hit = False + + async def block_echo(x): + nonlocal timeout_hit + called_handler.set() + try: + await asyncio.wait_for(block_handler.wait(), 0.1) + except TimeoutError: + timeout_hit = True + x = False + return x + + s.handlers["block_echo"] = block_echo + w = await Worker(s.address) + addr = s.address + + def f(x): + # This function calls into a handler on the remote which is + # blocked. This test eventually times out and the function + # should return and finish successfully, unblocking the + # ThreadPool + loop = get_worker().loop + from distributed.utils import sync + + async def _(): + pool = await ConnectionPool() + scheduler_rpc = pool(addr) + assert not await scheduler_rpc.block_echo(x=x) + + return sync(loop, _) + + fut = c.submit(f, True) + await called_handler.wait() + # executor_wait is True by default but we want to be explicit here + await w.close(executor_wait=True) + + assert timeout_hit diff --git a/distributed/worker.py b/distributed/worker.py index eb0d07c6505..2736755c14e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -45,6 +45,8 @@ typename, ) +from distributed.compatibility import to_thread + from . import comm, preloading, profile, system, utils from .batched import BatchedSend from .comm import Comm, connect, get_address_host @@ -1735,11 +1737,19 @@ async def close( for executor in self.executors.values(): if executor is utils._offload_executor: continue # Never shutdown the offload executor - if isinstance(executor, ThreadPoolExecutor): - executor._work_queue.queue.clear() - executor.shutdown(wait=executor_wait, timeout=timeout) - else: - executor.shutdown(wait=executor_wait) + + def _close(): + if isinstance(executor, ThreadPoolExecutor): + executor._work_queue.queue.clear() + executor.shutdown(wait=executor_wait, timeout=timeout) + else: + executor.shutdown(wait=executor_wait) + + # Waiting for the shutdown can block the event loop causing + # weird deadlocks particularly if the task that is executing in + # the thread is waiting for a server reply, e.g. when using + # worker clients, semaphores, etc. + await to_thread(_close) self.stop() await self.rpc.close() From 59461caa49de4d737c7e1aae56a651c37db20840 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 2 Mar 2022 12:05:41 +0100 Subject: [PATCH 2/4] Review comments --- distributed/compatibility.py | 12 ++++++------ distributed/tests/test_worker.py | 27 ++++++++++++++------------- distributed/worker.py | 3 +-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/distributed/compatibility.py b/distributed/compatibility.py index c2097b8ab77..32c94151d55 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -1,8 +1,5 @@ from __future__ import annotations -import asyncio -import contextvars -import functools import logging import platform import sys @@ -20,20 +17,23 @@ if sys.version_info >= (3, 9): from asyncio import to_thread else: + import contextvars + import functools + from asyncio import events - async def to_thread(*func_args, **kwargs): + async def to_thread(func, /, *args, **kwargs): """Asynchronously run function *func* in a separate thread. Any *args and **kwargs supplied for this function are directly passed to *func*. Also, the current :class:`contextvars.Context` is propagated, allowing context variables from the main thread to be accessed in the separate thread. + Return a coroutine that can be awaited to get the eventual result of *func*. backport from https://github.com/python/cpython/blob/3f1ea163ea54513e00e0e9d5442fee1b639825cc/Lib/asyncio/threads.py#L12-L25 """ - func, *args = func_args - loop = asyncio.get_running_loop() + loop = events.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) return await loop.run_in_executor(None, func_call) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 40628e29734..c043a592090 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -43,7 +43,7 @@ from distributed.metrics import time from distributed.protocol import pickle from distributed.scheduler import Scheduler -from distributed.utils import TimeoutError +from distributed.utils import TimeoutError, sync from distributed.utils_test import ( TaskStateMetadataPlugin, _LockedCommPool, @@ -3821,16 +3821,15 @@ async def test_do_not_block_event_loop_during_shutdown(): block_handler.clear() called_handler = asyncio.Event() called_handler.clear() - timeout_hit = False + handler_continued = False async def block_echo(x): - nonlocal timeout_hit + nonlocal handler_continued called_handler.set() - try: - await asyncio.wait_for(block_handler.wait(), 0.1) - except TimeoutError: - timeout_hit = True - x = False + await asyncio.sleep(0.1) + # If the event loop is blocked, we'll never hit this before the + # scheduler is closed + handler_continued = True return x s.handlers["block_echo"] = block_echo @@ -3843,18 +3842,20 @@ def f(x): # should return and finish successfully, unblocking the # ThreadPool loop = get_worker().loop - from distributed.utils import sync async def _(): pool = await ConnectionPool() scheduler_rpc = pool(addr) - assert not await scheduler_rpc.block_echo(x=x) + assert await scheduler_rpc.block_echo(x=x) - return sync(loop, _) + sync(loop, _) + # We can never receive the result of the future after the worker + # closes but we'll need to keep a ref to the future to not have it + # cancelled immediately fut = c.submit(f, True) await called_handler.wait() # executor_wait is True by default but we want to be explicit here await w.close(executor_wait=True) - - assert timeout_hit + del fut + assert handler_continued diff --git a/distributed/worker.py b/distributed/worker.py index 2736755c14e..589ab3aee54 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -45,13 +45,12 @@ typename, ) -from distributed.compatibility import to_thread - from . import comm, preloading, profile, system, utils from .batched import BatchedSend from .comm import Comm, connect, get_address_host from .comm.addressing import address_from_user_args, parse_address from .comm.utils import OFFLOAD_THRESHOLD +from .compatibility import to_thread from .core import ( CommClosedError, Status, From 34208d6cbe5570f6948335308184fc42d1c183dc Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 2 Mar 2022 13:46:50 +0100 Subject: [PATCH 3/4] More elegant unit test --- distributed/tests/test_worker.py | 86 ++++++++++++-------------------- 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index c043a592090..9f0f25ec063 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -37,13 +37,13 @@ ) from distributed.comm.registry import backends from distributed.compatibility import LINUX, WINDOWS -from distributed.core import CommClosedError, ConnectionPool, Status, rpc +from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics import nvml from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time from distributed.protocol import pickle from distributed.scheduler import Scheduler -from distributed.utils import TimeoutError, sync +from distributed.utils import TimeoutError from distributed.utils_test import ( TaskStateMetadataPlugin, _LockedCommPool, @@ -3807,55 +3807,35 @@ def test_unique_task_heap(): assert repr(heap) == "" -@gen_test( - # The timeout value here must be strictly larger than the default value for - # the timeout in `Worker.close`. The entire test should finish in <1s if - # everything works as intended but we want to make sure to not obfuscate - # another timeout therefore we increase this number. - timeout=60, -) -async def test_do_not_block_event_loop_during_shutdown(): - async with Scheduler() as s: - async with Client(s.address, asynchronous=True) as c: - block_handler = asyncio.Event() - block_handler.clear() - called_handler = asyncio.Event() - called_handler.clear() - handler_continued = False - - async def block_echo(x): - nonlocal handler_continued - called_handler.set() - await asyncio.sleep(0.1) - # If the event loop is blocked, we'll never hit this before the - # scheduler is closed - handler_continued = True - return x +@gen_cluster(client=True, nthreads=[]) +async def test_do_not_block_event_loop_during_shutdown(c, s): + loop = asyncio.get_running_loop() + called_handler = threading.Event() + block_handler = threading.Event() - s.handlers["block_echo"] = block_echo - w = await Worker(s.address) - addr = s.address - - def f(x): - # This function calls into a handler on the remote which is - # blocked. This test eventually times out and the function - # should return and finish successfully, unblocking the - # ThreadPool - loop = get_worker().loop - - async def _(): - pool = await ConnectionPool() - scheduler_rpc = pool(addr) - assert await scheduler_rpc.block_echo(x=x) - - sync(loop, _) - - # We can never receive the result of the future after the worker - # closes but we'll need to keep a ref to the future to not have it - # cancelled immediately - fut = c.submit(f, True) - await called_handler.wait() - # executor_wait is True by default but we want to be explicit here - await w.close(executor_wait=True) - del fut - assert handler_continued + w = await Worker(s.address) + executor = w.executors["default"] + + # The block wait must be smaller than the test timeout and smaller than the + # default value for timeout in `Worker.close`` + async def block(): + def fn(): + called_handler.set() + assert block_handler.wait(20) + + await loop.run_in_executor(executor, fn) + + async def set_future(): + while True: + try: + await loop.run_in_executor(executor, sleep, 0.1) + except RuntimeError: # executor has started shutting down + block_handler.set() + return + + async def close(): + called_handler.wait() + # executor_wait is True by default but we want to be explicit here + await w.close(executor_wait=True) + + await asyncio.gather(block(), close(), set_future()) From c6083152723e389686cf28d28ea2cf73e652cb54 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 2 Mar 2022 14:01:03 +0100 Subject: [PATCH 4/4] Remove Client --- distributed/tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 9f0f25ec063..c75d8abd7ea 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3807,8 +3807,8 @@ def test_unique_task_heap(): assert repr(heap) == "" -@gen_cluster(client=True, nthreads=[]) -async def test_do_not_block_event_loop_during_shutdown(c, s): +@gen_cluster(nthreads=[]) +async def test_do_not_block_event_loop_during_shutdown(s): loop = asyncio.get_running_loop() called_handler = threading.Event() block_handler = threading.Event()