diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b87e4022b23..ac87fca3e68 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -53,7 +53,7 @@ ) from distributed.metrics import time from distributed.protocol import pickle -from distributed.scheduler import Scheduler +from distributed.scheduler import KilledWorker, Scheduler from distributed.utils_test import ( NO_AMM, BlockedExecute, @@ -421,6 +421,44 @@ def __str__(self): assert "Bar" in str(e.__cause__) +@pytest.mark.slow +@pytest.mark.parametrize("sync", [True, False]) +@pytest.mark.parametrize( + "exc_type", [BaseException, SystemExit, KeyboardInterrupt, asyncio.CancelledError] +) +@gen_cluster( + nthreads=[("", 1)], + client=True, + Worker=Nanny, + config={"distributed.scheduler.allowed-failures": 0}, +) +async def test_base_exception_in_task(c, s, a, sync, exc_type): + if sync: + + def raiser(): + raise exc_type(f"this is a {exc_type}") + + else: + + async def raiser(): + raise exc_type(f"this is a {exc_type}") + + f = c.submit(raiser) + + try: + with pytest.raises( + KilledWorker if exc_type in (SystemExit, KeyboardInterrupt) else exc_type + ): + await f + except BaseException as e: + # Prevent test failure from killing the whole pytest process + traceback.print_exc() + pytest.fail(f"BaseException propagated back to test: {e!r}. See stdout.") + + # Nanny restarts it + await c.wait_for_workers(1) + + @gen_test() async def test_plugin_exception(): class MyPlugin: @@ -1547,7 +1585,7 @@ async def f(ev): task for task in asyncio.all_tasks() if "execute(f1)" in task.get_name() ) await a.close() - assert task.cancelled() + assert task.done() assert s.tasks["f1"].state in ("queued", "no-worker") diff --git a/distributed/worker.py b/distributed/worker.py index dc0b7168234..a0c5c85e2f3 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2293,8 +2293,24 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: stimulus_id=f"task-finished-{time()}", ) - if isinstance(result["actual-exception"], Reschedule): + task_exc = result["actual-exception"] + if isinstance(task_exc, Reschedule): return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}") + if ( + self.status == Status.closing + and isinstance(task_exc, asyncio.CancelledError) + and iscoroutinefunction(function) + ): + # `Worker.cancel` will cause async user tasks to raise `CancelledError`. + # Since we cancelled those tasks, we shouldn't treat them as failures. + # This is just a heuristic; it's _possible_ the task happened to + # fail independently with `CancelledError`. + logger.info( + f"Async task {key!r} cancelled during worker close; rescheduling." + ) + return RescheduleEvent( + key=ts.key, stimulus_id=f"cancelled-by-worker-close-{time()}" + ) logger.warning( "Compute Failed\n" @@ -3014,7 +3030,18 @@ def apply_function_simple( start = time() try: result = function(*args, **kwargs) - except Exception as e: + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will pass + # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught + # by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. + raise + except BaseException as e: + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e @@ -3050,7 +3077,20 @@ async def apply_function_async( start = time() try: result = await function(*args, **kwargs) - except Exception as e: + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will pass + # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught + # by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. + raise + except BaseException as e: + # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a + # reason to shut down the worker. + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e