Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
)
from distributed.metrics import time
from distributed.protocol import pickle
from distributed.scheduler import Scheduler
from distributed.scheduler import KilledWorker, Scheduler
from distributed.utils_test import (
NO_AMM,
BlockedExecute,
Expand Down Expand Up @@ -421,6 +421,44 @@ def __str__(self):
assert "Bar" in str(e.__cause__)


@pytest.mark.slow
@pytest.mark.parametrize("sync", [True, False])
@pytest.mark.parametrize(
"exc_type", [BaseException, SystemExit, KeyboardInterrupt, asyncio.CancelledError]
)
@gen_cluster(
nthreads=[("", 1)],
client=True,
Worker=Nanny,
config={"distributed.scheduler.allowed-failures": 0},
)
async def test_base_exception_in_task(c, s, a, sync, exc_type):
if sync:

def raiser():
raise exc_type(f"this is a {exc_type}")

else:

async def raiser():
raise exc_type(f"this is a {exc_type}")

f = c.submit(raiser)

try:
with pytest.raises(
KilledWorker if exc_type in (SystemExit, KeyboardInterrupt) else exc_type
):
await f
except BaseException as e:
# Prevent test failure from killing the whole pytest process
traceback.print_exc()
pytest.fail(f"BaseException propagated back to test: {e!r}. See stdout.")

# Nanny restarts it
await c.wait_for_workers(1)


@gen_test()
async def test_plugin_exception():
class MyPlugin:
Expand Down Expand Up @@ -1547,7 +1585,7 @@ async def f(ev):
task for task in asyncio.all_tasks() if "execute(f1)" in task.get_name()
)
await a.close()
assert task.cancelled()
assert task.done()
assert s.tasks["f1"].state in ("queued", "no-worker")


Expand Down
46 changes: 43 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2293,8 +2293,24 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
stimulus_id=f"task-finished-{time()}",
)

if isinstance(result["actual-exception"], Reschedule):
task_exc = result["actual-exception"]
if isinstance(task_exc, Reschedule):
return RescheduleEvent(key=ts.key, stimulus_id=f"reschedule-{time()}")
if (
self.status == Status.closing
and isinstance(task_exc, asyncio.CancelledError)
and iscoroutinefunction(function)
):
# `Worker.cancel` will cause async user tasks to raise `CancelledError`.
# Since we cancelled those tasks, we shouldn't treat them as failures.
# This is just a heuristic; it's _possible_ the task happened to
# fail independently with `CancelledError`.
logger.info(
f"Async task {key!r} cancelled during worker close; rescheduling."
)
return RescheduleEvent(
key=ts.key, stimulus_id=f"cancelled-by-worker-close-{time()}"
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a cosmetic tweak to this.
Could you add a unit test for it?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already covered by distributed/tests/test_worker.py::test_close_while_executing[False]. I could write another test for it, but it would look almost exactly the same.

The CancelledError when not closing case is also covered by test_base_exception_in_task.


logger.warning(
"Compute Failed\n"
Expand Down Expand Up @@ -3014,7 +3030,18 @@ def apply_function_simple(
start = time()
try:
result = function(*args, **kwargs)
except Exception as e:
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will pass
# through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught
# by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
Expand Down Expand Up @@ -3050,7 +3077,20 @@ async def apply_function_async(
start = time()
try:
result = await function(*args, **kwargs)
except Exception as e:
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will pass
# through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught
# by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a
# reason to shut down the worker.
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
Expand Down