From bfec8aaa235751764319debfa3188de939c3903f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 24 Mar 2022 17:16:51 -0600 Subject: [PATCH 1/9] Catch `BaseException`s from user tasks --- distributed/tests/test_worker.py | 16 ++++++++++++++++ distributed/worker.py | 7 ++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b87e4022b23..528933e43d4 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -421,6 +421,22 @@ def __str__(self): assert "Bar" in str(e.__cause__) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_baseexception_in_task(c, s, a): + class CustomBaseException(BaseException): + # This is exactly what Python says you shouldn't do, but users may do it anyway. + pass + + def bad_task(): + raise CustomBaseException("foo") + + f = c.submit(bad_task) + with pytest.raises(CustomBaseException, match="foo"): + await f + + assert not a.active_threads + + @gen_test() async def test_plugin_exception(): class MyPlugin: diff --git a/distributed/worker.py b/distributed/worker.py index 3a6e76fea46..b07913e3ffe 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3015,7 +3015,10 @@ def apply_function_simple( start = time() try: result = function(*args, **kwargs) - except Exception as e: + except BaseException as e: + # NOTE: SIGINT would always raise `KeyboardInterrupt` in the main thread, so we can assume + # `BaseException`s come from user code. Users _shouldn't_ use `BaseException`s, but if they do, we can + # assume they aren't a reason to shut down the whole system. This does block `sys.exit()` from user code. msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e @@ -3052,6 +3055,8 @@ async def apply_function_async( try: result = await function(*args, **kwargs) except Exception as e: + # NOTE: we don't catch `BaseException`s because we're running in the main thread, + # plus `asyncio.CancelledError` is a `BaseException` msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e From d5327c16482024498bc4e6f5dce6660f728f21c8 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 20:15:25 -0700 Subject: [PATCH 2/9] special-case SystemExit, KeyboardInterrupt --- distributed/worker.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index b07913e3ffe..535cbaff22c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3015,10 +3015,17 @@ def apply_function_simple( start = time() try: result = function(*args, **kwargs) + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will pass through `fail_hard`, + # pass through `_handle_stimulus_from_task`, and eventually be caught by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Note that any other `BaseException` types would ultimately be ignored by asyncio if raised here, + # after messing up the worker state machine along their way. + raise except BaseException as e: - # NOTE: SIGINT would always raise `KeyboardInterrupt` in the main thread, so we can assume - # `BaseException`s come from user code. Users _shouldn't_ use `BaseException`s, but if they do, we can - # assume they aren't a reason to shut down the whole system. This does block `sys.exit()` from user code. + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e From 989394e30aa3d95be48a8aa4380c4e3dd404964d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 20:19:56 -0700 Subject: [PATCH 3/9] replace with test from #7330 --- distributed/tests/test_worker.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 528933e43d4..6f364412bd4 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -53,7 +53,7 @@ ) from distributed.metrics import time from distributed.protocol import pickle -from distributed.scheduler import Scheduler +from distributed.scheduler import KilledWorker, Scheduler from distributed.utils_test import ( NO_AMM, BlockedExecute, @@ -421,20 +421,29 @@ def __str__(self): assert "Bar" in str(e.__cause__) -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_baseexception_in_task(c, s, a): - class CustomBaseException(BaseException): - # This is exactly what Python says you shouldn't do, but users may do it anyway. - pass +@pytest.mark.slow +@pytest.mark.parametrize( + "exc_type", [BaseException, SystemExit, KeyboardInterrupt, asyncio.CancelledError] +) +@gen_cluster( + nthreads=[("", 1)], + client=True, + Worker=Nanny, + config={"distributed.scheduler.allowed-failures": 0}, +) +async def test_base_exception_in_task(c, s, a, exc_type): + def raiser(): + raise exc_type(f"this is a {exc_type}") - def bad_task(): - raise CustomBaseException("foo") + f = c.submit(raiser) - f = c.submit(bad_task) - with pytest.raises(CustomBaseException, match="foo"): + with pytest.raises( + KilledWorker if exc_type in (SystemExit, KeyboardInterrupt) else exc_type + ): await f - assert not a.active_threads + # Nanny restarts it + await c.wait_for_workers(1) @gen_test() From 540f45220055ec2a84975e419894c9f66212d7d3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 20:29:47 -0700 Subject: [PATCH 4/9] match policy for async tasks --- distributed/tests/test_worker.py | 14 +++++++++++--- distributed/worker.py | 26 +++++++++++++++++++------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 6f364412bd4..a2812a08ecb 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -422,6 +422,7 @@ def __str__(self): @pytest.mark.slow +@pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize( "exc_type", [BaseException, SystemExit, KeyboardInterrupt, asyncio.CancelledError] ) @@ -431,9 +432,16 @@ def __str__(self): Worker=Nanny, config={"distributed.scheduler.allowed-failures": 0}, ) -async def test_base_exception_in_task(c, s, a, exc_type): - def raiser(): - raise exc_type(f"this is a {exc_type}") +async def test_base_exception_in_task(c, s, a, sync, exc_type): + if sync: + + def raiser(): + raise exc_type(f"this is a {exc_type}") + + else: + + async def raiser(): + raise exc_type(f"this is a {exc_type}") f = c.submit(raiser) diff --git a/distributed/worker.py b/distributed/worker.py index 535cbaff22c..d76c7da7faa 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3016,11 +3016,12 @@ def apply_function_simple( try: result = function(*args, **kwargs) except (SystemExit, KeyboardInterrupt): - # Special-case these, just like asyncio does all over the place. They will pass through `fail_hard`, - # pass through `_handle_stimulus_from_task`, and eventually be caught by special-case logic in asyncio: + # Special-case these, just like asyncio does all over the place. They will pass + # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught + # by special-case logic in asyncio: # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 - # Note that any other `BaseException` types would ultimately be ignored by asyncio if raised here, - # after messing up the worker state machine along their way. + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. raise except BaseException as e: # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they @@ -3061,9 +3062,20 @@ async def apply_function_async( start = time() try: result = await function(*args, **kwargs) - except Exception as e: - # NOTE: we don't catch `BaseException`s because we're running in the main thread, - # plus `asyncio.CancelledError` is a `BaseException` + except (SystemExit, KeyboardInterrupt): + # Special-case these, just like asyncio does all over the place. They will pass + # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught + # by special-case logic in asyncio: + # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 + # Any other `BaseException` types would ultimately be ignored by asyncio if + # raised here, after messing up the worker state machine along their way. + raise + except BaseException as e: + # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a + # reason to shut down the worker. + # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they + # aren't a reason to shut down the whole system (since we allow the + # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) msg = error_message(e) msg["op"] = "task-erred" msg["actual-exception"] = e From 1e3cec0c606a895df34d8817f6a13ec74f0619e1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 20:32:17 -0700 Subject: [PATCH 5/9] guard failed test killing CI --- distributed/tests/test_worker.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a2812a08ecb..aa51a40edb5 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -445,10 +445,15 @@ async def raiser(): f = c.submit(raiser) - with pytest.raises( - KilledWorker if exc_type in (SystemExit, KeyboardInterrupt) else exc_type - ): - await f + try: + with pytest.raises( + KilledWorker if exc_type in (SystemExit, KeyboardInterrupt) else exc_type + ): + await f + except BaseException as e: + # Prevent test failure from killing the whole pytest process + traceback.print_exc() + pytest.fail(f"BaseException propagated back to test: {e!r}. See stdout.") # Nanny restarts it await c.wait_for_workers(1) From 0dd9abaf1ea4badd947232e4a535883620f31427 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 21:09:36 -0700 Subject: [PATCH 6/9] task may have happily failed on its own --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index aa51a40edb5..ac87fca3e68 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1585,7 +1585,7 @@ async def f(ev): task for task in asyncio.all_tasks() if "execute(f1)" in task.get_name() ) await a.close() - assert task.cancelled() + assert task.done() assert s.tasks["f1"].state in ("queued", "no-worker") From 67ef4d6ae17f9b9223bbecc291dc501eb8c04b0c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 21:41:31 -0700 Subject: [PATCH 7/9] hack around cancelled user async tasks --- distributed/worker.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index d76c7da7faa..82a97acdebf 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2294,7 +2294,14 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: stimulus_id=f"task-finished-{time()}", ) - if isinstance(result["actual-exception"], Reschedule): + task_exc = result["actual-exception"] + if isinstance(task_exc, Reschedule) or ( + self.status == Status.closing + and isinstance(task_exc, asyncio.CancelledError) + and iscoroutinefunction(function) + ): + # HACK: `Worker.cancel` will cause async user tasks will raise `CancelledError`. + # Since we cancelled those tasks, we shouldn't treat them as failures. return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}") logger.warning( From 5b13a6706d8bce82d4ea345377cdc9e78c0e5536 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Nov 2022 21:50:10 -0700 Subject: [PATCH 8/9] add log statement --- distributed/worker.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 82a97acdebf..5efef6700c6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2295,13 +2295,20 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: ) task_exc = result["actual-exception"] - if isinstance(task_exc, Reschedule) or ( + close_cancelled = ( self.status == Status.closing and isinstance(task_exc, asyncio.CancelledError) and iscoroutinefunction(function) - ): - # HACK: `Worker.cancel` will cause async user tasks will raise `CancelledError`. - # Since we cancelled those tasks, we shouldn't treat them as failures. + ) + if isinstance(task_exc, Reschedule) or close_cancelled: + if close_cancelled: + # `Worker.cancel` will cause async user tasks to raise `CancelledError`. + # Since we cancelled those tasks, we shouldn't treat them as failures. + # This is just a heuristic; it's _possible_ the task happened to + # fail independently with `CancelledError`. + logger.info( + f"Async task {key!r} cancelled during worker close; rescheduling." + ) return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}") logger.warning( From 0bcc3a3e00009438220bf7139cbee621529b98ef Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 21 Nov 2022 14:45:59 +0000 Subject: [PATCH 9/9] Code review --- distributed/worker.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 57685450813..a0c5c85e2f3 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2294,21 +2294,23 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: ) task_exc = result["actual-exception"] - close_cancelled = ( + if isinstance(task_exc, Reschedule): + return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}") + if ( self.status == Status.closing and isinstance(task_exc, asyncio.CancelledError) and iscoroutinefunction(function) - ) - if isinstance(task_exc, Reschedule) or close_cancelled: - if close_cancelled: - # `Worker.cancel` will cause async user tasks to raise `CancelledError`. - # Since we cancelled those tasks, we shouldn't treat them as failures. - # This is just a heuristic; it's _possible_ the task happened to - # fail independently with `CancelledError`. - logger.info( - f"Async task {key!r} cancelled during worker close; rescheduling." - ) - return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}") + ): + # `Worker.cancel` will cause async user tasks to raise `CancelledError`. + # Since we cancelled those tasks, we shouldn't treat them as failures. + # This is just a heuristic; it's _possible_ the task happened to + # fail independently with `CancelledError`. + logger.info( + f"Async task {key!r} cancelled during worker close; rescheduling." + ) + return RescheduleEvent( + key=ts.key, stimulus_id=f"cancelled-by-worker-close-{time()}" + ) logger.warning( "Compute Failed\n"