From 87521cedefed759e482441a5f4d811a2015e7db4 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 17 Nov 2022 15:58:09 +0100 Subject: [PATCH 1/5] Guarantee worker is restarted if Nanny.kill is called --- distributed/core.py | 55 +++++++------ distributed/nanny.py | 133 ++++++++++++++----------------- distributed/scheduler.py | 4 +- distributed/tests/test_nanny.py | 60 +++++++++++++- distributed/tests/test_worker.py | 12 ++- distributed/worker.py | 3 + 6 files changed, 164 insertions(+), 103 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 41e43485a99..3af99e6d3f9 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -351,6 +351,7 @@ def __init__( self.digests = None self._ongoing_background_tasks = AsyncTaskGroup() self._event_finished = asyncio.Event() + self._event_started = asyncio.Event() self.listeners = [] self.io_loop = self.loop = IOLoop.current() @@ -489,6 +490,9 @@ async def finished(self): """Wait until the server has finished""" await self._event_finished.wait() + async def started(self): + await self._event_started.wait() + def __await__(self): return self.start().__await__() @@ -507,30 +511,32 @@ async def start_unsafe(self): @final async def start(self): - async with self._startup_lock: - if self.status == Status.failed: - assert self.__startup_exc is not None - raise self.__startup_exc - elif self.status != Status.init: - return self - timeout = getattr(self, "death_timeout", None) - - async def _close_on_failure(exc: Exception) -> None: - await self.close() - self.status = Status.failed - self.__startup_exc = exc + if self.status == Status.failed: + assert self.__startup_exc is not None + raise self.__startup_exc + elif self.status != Status.init: + return self - try: + async def _close_on_failure(exc: Exception) -> None: + self._event_started.set() + await self.close() + self.status = Status.failed + self.__startup_exc = exc + + timeout = getattr(self, "death_timeout", None) + try: + async with self._startup_lock: await asyncio.wait_for(self.start_unsafe(), timeout=timeout) - except asyncio.TimeoutError as exc: - await _close_on_failure(exc) - raise asyncio.TimeoutError( - f"{type(self).__name__} start timed out after {timeout}s." - ) from exc - except Exception as exc: - await _close_on_failure(exc) - raise RuntimeError(f"{type(self).__name__} failed to start.") from exc - self.status = Status.running + self._event_started.set() + self.status = Status.running + except asyncio.TimeoutError as exc: + await _close_on_failure(exc) + raise asyncio.TimeoutError( + f"{type(self).__name__} start timed out after {timeout}s." + ) from exc + except Exception as exc: + await _close_on_failure(exc) + raise RuntimeError(f"{type(self).__name__} failed to start.") from exc return self async def __aenter__(self): @@ -741,7 +747,7 @@ async def _handle_comm(self, comm): logger.debug("Connection from %r to %s", address, type(self).__name__) self._comms[comm] = op - await self + await self.started() try: while not self.__stopped: try: @@ -940,6 +946,9 @@ async def close(self, timeout=None): await asyncio.gather(*[comm.close() for comm in list(self._comms)]) finally: self._event_finished.set() + logger.debug( + f"Closed {type(self).__name__} - {self.address_safe} - {self.id}" + ) def pingpong(comm): diff --git a/distributed/nanny.py b/distributed/nanny.py index 4ae0e28b48f..80c1d45f05c 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -13,10 +13,10 @@ import uuid import warnings import weakref +from collections import defaultdict from collections.abc import Collection from inspect import isawaitable from queue import Empty -from time import sleep as sync_sleep from typing import TYPE_CHECKING, Callable, ClassVar, Literal from toolz import merge @@ -119,6 +119,7 @@ class Nanny(ServerNode): # Inputs to parse_ports() _given_worker_port: int | str | Collection[int] | None _start_port: int | str | Collection[int] | None + _process_callback_received: defaultdict[WorkerProcess, asyncio.Event] def __init__( # type: ignore[no-untyped-def] self, @@ -223,6 +224,9 @@ def __init__( # type: ignore[no-untyped-def] self.validate = validate self.resources = resources + self._instantiate_lock = asyncio.Lock() + self._process_callback_received = defaultdict(asyncio.Event) + self.Worker = Worker if worker_class is None else worker_class self.pre_spawn_env = _get_env_variables("distributed.nanny.pre-spawn-environ") @@ -385,66 +389,50 @@ async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None: return deadline = time() + timeout - await self.process.kill(reason=reason, timeout=0.8 * (deadline - time())) + proc = self.process + await proc.kill(reason=reason, timeout=0.8 * (deadline - time())) + assert proc.status in (Status.stopped, Status.failed), proc.status + assert proc.stopped.is_set() + await self._process_callback_received[proc].wait() + assert self.process is not proc async def instantiate(self) -> Status: """Start a local worker process Blocks until the process is up and the scheduler is properly informed """ - if self.process is None: - worker_kwargs = dict( - scheduler_ip=self.scheduler_addr, - nthreads=self.nthreads, - local_directory=self._original_local_dir, - services=self.services, - nanny=self.address, - name=self.name, - memory_limit=self.memory_manager.memory_limit, - resources=self.resources, - validate=self.validate, - silence_logs=self.silence_logs, - death_timeout=self.death_timeout, - preload=self.preload, - preload_argv=self.preload_argv, - security=self.security, - contact_address=self.contact_address, - ) - worker_kwargs.update(self.worker_kwargs) - self.process = WorkerProcess( - worker_kwargs=worker_kwargs, - silence_logs=self.silence_logs, - on_exit=self._on_worker_exit_sync, - worker=self.Worker, - env=self.env, - pre_spawn_env=self.pre_spawn_env, - config=self.config, - ) - - if self.death_timeout: - try: - result = await asyncio.wait_for( - self.process.start(), self.death_timeout + # The lock is required since there are many possible race conditions due + # to the worker exit callback + async with self._instantiate_lock: + if self.process is None: + worker_kwargs = dict( + scheduler_ip=self.scheduler_addr, + nthreads=self.nthreads, + local_directory=self._original_local_dir, + services=self.services, + nanny=self.address, + name=self.name, + memory_limit=self.memory_manager.memory_limit, + resources=self.resources, + validate=self.validate, + silence_logs=self.silence_logs, + death_timeout=self.death_timeout, + preload=self.preload, + preload_argv=self.preload_argv, + security=self.security, + contact_address=self.contact_address, ) - except asyncio.TimeoutError: - logger.error( - "Timed out connecting Nanny '%s' to scheduler '%s'", - self, - self.scheduler_addr, + worker_kwargs.update(self.worker_kwargs) + self.process = WorkerProcess( + worker_kwargs=worker_kwargs, + silence_logs=self.silence_logs, + on_exit=self._on_worker_exit_sync, + worker=self.Worker, + env=self.env, + pre_spawn_env=self.pre_spawn_env, + config=self.config, ) - await self.close( - timeout=self.death_timeout, reason="nanny-instantiate-timeout" - ) - raise - - else: - try: - result = await self.process.start() - except Exception: - logger.error("Failed to start process", exc_info=True) - await self.close(reason="nanny-instantiate-failed") - raise - return result + return await self.process.start() @log_errors async def plugin_add(self, plugin=None, name=None): @@ -519,6 +507,9 @@ def _on_worker_exit_sync(self, exitcode): @log_errors async def _on_worker_exit(self, exitcode): + assert self.process + self._process_callback_received[self.process].set() + self.process = None if self.status not in ( Status.init, Status.closing, @@ -550,6 +541,8 @@ async def _on_worker_exit(self, exitcode): logger.error( "Failed to restart worker after its process exited", exc_info=True ) + await self.close(reason="worker-failed-restart") + raise @property def pid(self): @@ -578,11 +571,14 @@ async def close( """ if self.status == Status.closing: await self.finished() - assert self.status == Status.closed + assert self.status in (Status.closed, Status.failed) - if self.status == Status.closed: + if self.status in (Status.closed, Status.failed): return "OK" + # Make sure we're not colliding with the startup coro when setting the + # status to closing + await self.started() self.status = Status.closing logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) @@ -726,6 +722,7 @@ async def start(self) -> Status: self.running.set() init_q.close() + init_q.join_thread() return self.status @@ -817,22 +814,20 @@ async def kill( "reason": reason, } ) - await asyncio.sleep(0) # otherwise we get broken pipe errors queue.close() + queue.join_thread() del queue try: try: await process.join(wait_timeout) - return except asyncio.TimeoutError: - pass - - logger.warning( - f"Worker process still alive after {wait_timeout} seconds, killing" - ) - await process.kill() - await process.join(max(0, deadline - time())) + logger.warning( + f"Worker process still alive after {wait_timeout} seconds, killing" + ) + await process.kill() + await process.join(max(0, deadline - time())) + await self.stopped.wait() except ValueError as e: if "invalid operation on closed AsyncProcess" in str(e): return @@ -934,6 +929,7 @@ async def run() -> None: } ) init_result_q.close() + init_result_q.join_thread() await worker.finished() logger.info("Worker closed") except Exception as e: @@ -943,14 +939,7 @@ async def run() -> None: logger.exception(f"Failed to {failure_type} worker") init_result_q.put({"uid": uid, "exception": e}) init_result_q.close() - # If we hit an exception here we need to wait for a least - # one interval for the outside to pick up this message. - # Otherwise we arrive in a race condition where the process - # cleanup wipes the queue before the exception can be - # properly handled. See also - # WorkerProcess._wait_until_connected (the 3 is for good - # measure) - sync_sleep(cls._init_msg_interval * 3) + init_result_q.join_thread() with contextlib.ExitStack() as stack: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 288194c6573..a28b6bf616f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3958,7 +3958,9 @@ async def log_errors(func): await asyncio.gather( *[log_errors(plugin.before_close) for plugin in list(self.plugins.values())] ) - + # Make sure we're not colliding with the startup coro when setting the + # status to closing + await self.started() self.status = Status.closing logger.info("Scheduler closing...") diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 0fe602c14aa..58a7b6a5dbb 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -26,7 +26,7 @@ from distributed import Nanny, Scheduler, Worker, profile, rpc, wait, worker from distributed.compatibility import LINUX, WINDOWS -from distributed.core import CommClosedError, Status +from distributed.core import CommClosedError, ConnectionPool, Status from distributed.diagnostics import SchedulerPlugin from distributed.metrics import time from distributed.protocol.pickle import dumps @@ -543,8 +543,38 @@ async def test_worker_start_exception(s): # ^ NOTE: `Nanny.close` sets it to `closed`, then `Server.start._close_on_failure` sets it to `failed` assert nanny.process is None assert "Restarting worker" not in logs.getvalue() - # Avoid excessive spewing. (It's also printed once extra within the subprocess, which is okay.) - assert logs.getvalue().count("ValueError: broken") == 1, logs.getvalue() + + +@pytest.mark.parametrize( + "api", + [ + "kill", + "restart", + ], +) +@gen_cluster(nthreads=[]) +async def test_worker_start_exception_after_restart(s, api): + async with Nanny(s.address) as nanny: + # A restart should fail + nanny.worker_kwargs.update( + { + "scheduler_port": -1234, + "nthreads": -42, + "port": -9876, + "protocol": "doesnt-exit", + } + ) + if api == "kill": + # Kill is not immediately restarting the process and is therefore + # not raising an exception and we need to wait + await nanny.kill() + await nanny.finished() + else: + # something is failing, we do not care too much what exactly + with pytest.raises(Exception): + await nanny.restart() + await nanny.finished() + assert nanny.status == Status.closed @gen_cluster(nthreads=[]) @@ -760,3 +790,27 @@ async def test_worker_inherits_temp_config(c, s): async with Nanny(s.address): out = await c.submit(lambda: dask.config.get("test123")) assert out == 123 + + +@pytest.mark.slow +@pytest.mark.parametrize("api", ["restart", "kill"]) +@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) +async def test_restart_stress(c, s, a, api): + async def keep_killing(): + pool = await ConnectionPool() + try: + rpc = pool(a.address) + for _ in range(2): + try: + meth = getattr(rpc, api) + await meth(reason="scheduler-restart") + except OSError: + break + + await asyncio.sleep(0.1) + finally: + await pool.close() + + kill_tasks = [asyncio.create_task(keep_killing()) for _ in range(2)] + await asyncio.gather(*kill_tasks) + assert a.status == Status.running diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index ada49d2feb4..6f9b5c507d3 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -54,6 +54,7 @@ from distributed.metrics import time from distributed.protocol import pickle from distributed.scheduler import KilledWorker, Scheduler +from distributed.utils import open_port from distributed.utils_test import ( NO_AMM, BlockedExecute, @@ -349,18 +350,21 @@ async def test_worker_port_range(s): @pytest.mark.slow @gen_test(timeout=60) async def test_worker_waits_for_scheduler(): - w = Worker("127.0.0.1:8724") + port = open_port() + w = Worker(f"127.0.0.1:{port}") async def f(): - await w + async with w: + pass task = asyncio.create_task(f()) await asyncio.sleep(3) assert not task.done() - task.cancel() assert w.status not in (Status.closed, Status.running, Status.paused) - await w.close(timeout=0.1) + + async with Scheduler(port=port): + await task @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) diff --git a/distributed/worker.py b/distributed/worker.py index 4242648d950..984417880ea 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1516,6 +1516,9 @@ async def close( # type: ignore logger.info("Closed worker has not yet started: %s", self.status) if not executor_wait: logger.info("Not waiting on executor to close") + # Make sure we're not colliding with the startup coro when setting the + # status to closing + await self.started() self.status = Status.closing # Stop callbacks before giving up control in any `await`. From 6d675ba51b821f8c693225f10fcba81c3a8c8e27 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 1 Dec 2022 13:39:35 +0100 Subject: [PATCH 2/5] WorkerProcess blocks on kill if still starting --- distributed/nanny.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/nanny.py b/distributed/nanny.py index 80c1d45f05c..4f17a08db10 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -788,13 +788,20 @@ async def kill( """ deadline = time() + timeout + # If the process is not properly up it will not watch the closing queue + # and we may end up leaking this process. + # Therefore wait for it to be properly started before killing it. + if self.status == Status.starting: + await self.running.wait() + if self.status == Status.stopped: return + if self.status == Status.stopping: await self.stopped.wait() return + assert self.status in ( - Status.starting, Status.running, Status.failed, # process failed to start, but hasn't been joined yet ), self.status From 9d9c46d08cf39affc767ce7b4897fbfea3ad949e Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 12 Dec 2022 19:33:37 +0100 Subject: [PATCH 3/5] Verify status again in instantiate --- distributed/core.py | 10 ++++++++-- distributed/nanny.py | 13 ++++++++++++- distributed/tests/test_nanny.py | 15 +++++---------- distributed/worker.py | 1 + 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 3af99e6d3f9..c663ec4d402 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -496,6 +496,9 @@ async def started(self): def __await__(self): return self.start().__await__() + async def cancel_start(self): + self._startup_task.cancel() + async def start_unsafe(self): """Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts. @@ -526,8 +529,11 @@ async def _close_on_failure(exc: Exception) -> None: timeout = getattr(self, "death_timeout", None) try: async with self._startup_lock: - await asyncio.wait_for(self.start_unsafe(), timeout=timeout) - self._event_started.set() + self._startup_task = asyncio.create_task(self.start_unsafe()) + self._startup_task.add_done_callback( + lambda _: self._event_started.set() + ) + await asyncio.wait_for(self._startup_task, timeout=timeout) self.status = Status.running except asyncio.TimeoutError as exc: await _close_on_failure(exc) diff --git a/distributed/nanny.py b/distributed/nanny.py index 4f17a08db10..44548b674a1 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -404,6 +404,15 @@ async def instantiate(self) -> Status: # The lock is required since there are many possible race conditions due # to the worker exit callback async with self._instantiate_lock: + if self.status in ( + Status.closing, + Status.closed, + Status.closing_gracefully, + Status.failed, + ): + raise RuntimeError( + "Tried to start a worker on closed Nanny. This can happen if an error occured during restart. Please check logs for more information." + ) if self.process is None: worker_kwargs = dict( scheduler_ip=self.scheduler_addr, @@ -578,9 +587,11 @@ async def close( # Make sure we're not colliding with the startup coro when setting the # status to closing + logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) + await self.cancel_start() await self.started() + self.status = Status.closing - logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) for preload in self.preloads: await preload.teardown() diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 58a7b6a5dbb..f19d3208380 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -554,16 +554,11 @@ async def test_worker_start_exception(s): ) @gen_cluster(nthreads=[]) async def test_worker_start_exception_after_restart(s, api): - async with Nanny(s.address) as nanny: - # A restart should fail - nanny.worker_kwargs.update( - { - "scheduler_port": -1234, - "nthreads": -42, - "port": -9876, - "protocol": "doesnt-exit", - } - ) + async with Nanny(s.address, death_timeout="2s") as nanny: + # Stop the listener on the scheduler, i.e. do not allow any new incoming + # connections. The restarting workers will fail while trying to attempt + # connection + s.stop() if api == "kill": # Kill is not immediately restarting the process and is therefore # not raising an exception and we need to wait diff --git a/distributed/worker.py b/distributed/worker.py index 984417880ea..1db5aca13f8 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1518,6 +1518,7 @@ async def close( # type: ignore logger.info("Not waiting on executor to close") # Make sure we're not colliding with the startup coro when setting the # status to closing + await self.cancel_start() await self.started() self.status = Status.closing From f6b8f8fb2fd435e813b7e8d15766a765d0e5fd74 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 13 Dec 2022 13:51:02 +0100 Subject: [PATCH 4/5] More fixes --- distributed/core.py | 9 +++++---- distributed/nanny.py | 9 ++------- distributed/tests/test_nanny.py | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index c663ec4d402..acf16cc9f71 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -419,6 +419,7 @@ def set_thread_ident(): self.io_loop.add_callback(set_thread_ident) self._startup_lock = asyncio.Lock() self.__startup_exc = None + self.__startup_task = None self.rpc = ConnectionPool( limit=connection_limit, @@ -497,7 +498,7 @@ def __await__(self): return self.start().__await__() async def cancel_start(self): - self._startup_task.cancel() + self.__startup_task.cancel() async def start_unsafe(self): """Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts. @@ -529,11 +530,11 @@ async def _close_on_failure(exc: Exception) -> None: timeout = getattr(self, "death_timeout", None) try: async with self._startup_lock: - self._startup_task = asyncio.create_task(self.start_unsafe()) - self._startup_task.add_done_callback( + self.__startup_task = asyncio.create_task(self.start_unsafe()) + self.__startup_task.add_done_callback( lambda _: self._event_started.set() ) - await asyncio.wait_for(self._startup_task, timeout=timeout) + await asyncio.wait_for(self.__startup_task, timeout=timeout) self.status = Status.running except asyncio.TimeoutError as exc: await _close_on_failure(exc) diff --git a/distributed/nanny.py b/distributed/nanny.py index 44548b674a1..7d4301160ab 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -13,7 +13,6 @@ import uuid import warnings import weakref -from collections import defaultdict from collections.abc import Collection from inspect import isawaitable from queue import Empty @@ -119,7 +118,6 @@ class Nanny(ServerNode): # Inputs to parse_ports() _given_worker_port: int | str | Collection[int] | None _start_port: int | str | Collection[int] | None - _process_callback_received: defaultdict[WorkerProcess, asyncio.Event] def __init__( # type: ignore[no-untyped-def] self, @@ -225,7 +223,6 @@ def __init__( # type: ignore[no-untyped-def] self.resources = resources self._instantiate_lock = asyncio.Lock() - self._process_callback_received = defaultdict(asyncio.Event) self.Worker = Worker if worker_class is None else worker_class @@ -392,8 +389,7 @@ async def kill(self, timeout: float = 2, reason: str = "nanny-kill") -> None: proc = self.process await proc.kill(reason=reason, timeout=0.8 * (deadline - time())) assert proc.status in (Status.stopped, Status.failed), proc.status - assert proc.stopped.is_set() - await self._process_callback_received[proc].wait() + await proc.stopped.wait() assert self.process is not proc async def instantiate(self) -> Status: @@ -517,7 +513,6 @@ def _on_worker_exit_sync(self, exitcode): @log_errors async def _on_worker_exit(self, exitcode): assert self.process - self._process_callback_received[self.process].set() self.process = None if self.status not in ( Status.init, @@ -768,7 +763,6 @@ def mark_stopped(self): msg = self._death_message(self.process.pid, r) logger.info(msg) self.status = Status.stopped - self.stopped.set() # Release resources self.process.close() self.init_result_q = None @@ -781,6 +775,7 @@ def mark_stopped(self): # User hook if self.on_exit is not None: self.on_exit(r) + self.stopped.set() async def kill( self, diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index f19d3208380..6fd49ae917d 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -554,7 +554,7 @@ async def test_worker_start_exception(s): ) @gen_cluster(nthreads=[]) async def test_worker_start_exception_after_restart(s, api): - async with Nanny(s.address, death_timeout="2s") as nanny: + async with Nanny(s.address, death_timeout="5s") as nanny: # Stop the listener on the scheduler, i.e. do not allow any new incoming # connections. The restarting workers will fail while trying to attempt # connection From 178f57faca12b95dfa35c3d9e03385ac428503d6 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 13 Dec 2022 14:16:38 +0100 Subject: [PATCH 5/5] check if None --- distributed/core.py | 4 +++- distributed/nanny.py | 1 - distributed/worker.py | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index acf16cc9f71..ccc781cb899 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -498,7 +498,9 @@ def __await__(self): return self.start().__await__() async def cancel_start(self): - self.__startup_task.cancel() + if self.__startup_task: + self.__startup_task.cancel() + await self.started() async def start_unsafe(self): """Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts. diff --git a/distributed/nanny.py b/distributed/nanny.py index 7d4301160ab..fcd3c5241ba 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -584,7 +584,6 @@ async def close( # status to closing logger.info("Closing Nanny at %r. Reason: %s", self.address_safe, reason) await self.cancel_start() - await self.started() self.status = Status.closing diff --git a/distributed/worker.py b/distributed/worker.py index 1db5aca13f8..c3e1044b32e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1519,7 +1519,6 @@ async def close( # type: ignore # Make sure we're not colliding with the startup coro when setting the # status to closing await self.cancel_start() - await self.started() self.status = Status.closing # Stop callbacks before giving up control in any `await`.