Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,6 @@ async def f(ev):
assert s.tasks["f1"].state in ("queued", "no-worker")


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_close_async_task_handles_cancellation(c, s, a):
ev = Event()
Expand All @@ -1622,6 +1621,8 @@ async def f(ev):
task.cancel()
await asyncio.wait({task})

await a.close(timeout=1)


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)], timeout=10)
Expand Down
66 changes: 6 additions & 60 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from concurrent.futures import Executor
from contextlib import suppress
from datetime import timedelta
from functools import wraps
from functools import wraps, partial
from inspect import isawaitable
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -2304,17 +2304,11 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
try:
ts.start_time = time()
if iscoroutinefunction(function):
token = _worker_cvar.set(self)
try:
result = await apply_function_async(
function,
args2,
kwargs2,
self.scheduler_delay,
)
finally:
_worker_cvar.reset(token)
elif "ThreadPoolExecutor" in str(type(e)):
coro = function(*args2, **kwargs2)
function = partial(asyncio.run, coro)
args2 = ()
kwargs2 = {}
if "ThreadPoolExecutor" in str(type(e)):
# The 'executor' time metric should be almost zero most of the time,
# e.g. thread synchronization overhead only, since thread-noncpu and
# thread-cpu inside the thread detract from it. However, it may
Expand Down Expand Up @@ -3157,54 +3151,6 @@ def apply_function_simple(
return msg


async def apply_function_async(
function,
args,
kwargs,
time_delay,
):
"""Run a function, collect information

Returns
-------
msg: dictionary with status, result/error, timings, etc..
"""
ident = threading.get_ident()
try:
with context_meter.meter("thread-noncpu", func=time) as m:
result = await function(*args, **kwargs)
except (SystemExit, KeyboardInterrupt):
# Special-case these, just like asyncio does all over the place. They will pass
# through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught
# by special-case logic in asyncio:
# https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82
# Any other `BaseException` types would ultimately be ignored by asyncio if
# raised here, after messing up the worker state machine along their way.
raise
except BaseException as e:
# NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a
# reason to shut down the worker.
# Users _shouldn't_ use `BaseException`s, but if they do, we can assume they
# aren't a reason to shut down the whole system (since we allow the
# system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through)
msg = error_message(e)
msg["op"] = "task-erred"
msg["actual-exception"] = e
else:
msg = {
"op": "task-finished",
"status": "OK",
"result": result,
"nbytes": sizeof(result),
"type": type(result) if result is not None else None,
}

msg["start"] = m.start + time_delay
msg["stop"] = m.stop + time_delay
msg["thread"] = ident
return msg


def apply_function_actor(
function, args, kwargs, execution_state, key, active_threads, active_threads_lock
):
Expand Down