diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 29b789618c7..6c1d73f00bd 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1595,7 +1595,6 @@ async def f(ev): assert s.tasks["f1"].state in ("queued", "no-worker") -@pytest.mark.slow @gen_cluster(client=True, nthreads=[("", 1)]) async def test_close_async_task_handles_cancellation(c, s, a): ev = Event() @@ -1622,6 +1621,8 @@ async def f(ev): task.cancel() await asyncio.wait({task}) + await a.close(timeout=1) + @pytest.mark.slow @gen_cluster(client=True, nthreads=[("", 1)], timeout=10) diff --git a/distributed/worker.py b/distributed/worker.py index 27dbc5166f2..7a4e2adbc3d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -29,7 +29,7 @@ from concurrent.futures import Executor from contextlib import suppress from datetime import timedelta -from functools import wraps +from functools import wraps, partial from inspect import isawaitable from typing import ( TYPE_CHECKING, @@ -2304,17 +2304,11 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: try: ts.start_time = time() if iscoroutinefunction(function): - token = _worker_cvar.set(self) - try: - result = await apply_function_async( - function, - args2, - kwargs2, - self.scheduler_delay, - ) - finally: - _worker_cvar.reset(token) - elif "ThreadPoolExecutor" in str(type(e)): + coro = function(*args2, **kwargs2) + function = partial(asyncio.run, coro) + args2 = () + kwargs2 = {} + if "ThreadPoolExecutor" in str(type(e)): # The 'executor' time metric should be almost zero most of the time, # e.g. thread synchronization overhead only, since thread-noncpu and # thread-cpu inside the thread detract from it. However, it may @@ -3157,54 +3151,6 @@ def apply_function_simple( return msg -async def apply_function_async( - function, - args, - kwargs, - time_delay, -): - """Run a function, collect information - - Returns - ------- - msg: dictionary with status, result/error, timings, etc.. - """ - ident = threading.get_ident() - try: - with context_meter.meter("thread-noncpu", func=time) as m: - result = await function(*args, **kwargs) - except (SystemExit, KeyboardInterrupt): - # Special-case these, just like asyncio does all over the place. They will pass - # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught - # by special-case logic in asyncio: - # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 - # Any other `BaseException` types would ultimately be ignored by asyncio if - # raised here, after messing up the worker state machine along their way. - raise - except BaseException as e: - # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a - # reason to shut down the worker. - # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they - # aren't a reason to shut down the whole system (since we allow the - # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) - msg = error_message(e) - msg["op"] = "task-erred" - msg["actual-exception"] = e - else: - msg = { - "op": "task-finished", - "status": "OK", - "result": result, - "nbytes": sizeof(result), - "type": type(result) if result is not None else None, - } - - msg["start"] = m.start + time_delay - msg["stop"] = m.stop + time_delay - msg["thread"] = ident - return msg - - def apply_function_actor( function, args, kwargs, execution_state, key, active_threads, active_threads_lock ):