diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 96279d15323..66011444c76 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -308,10 +308,7 @@ async def _correct_state_internal(self): if self.scheduler.status == "running": await self.scheduler_comm.retire_workers(workers=list(to_close)) tasks = [self.workers[w].close() for w in to_close if w in self.workers] - await asyncio.wait(tasks) - for task in tasks: # for tornado gen.coroutine support - with ignoring(RuntimeError): - await task + await asyncio.gather(*tasks) for name in to_close: if name in self.workers: del self.workers[name] @@ -330,10 +327,9 @@ async def _correct_state_internal(self): self._created.add(worker) workers.append(worker) if workers: - await asyncio.wait(workers) + await asyncio.gather(*workers) for w in workers: w._cluster = weakref.ref(self) - await w # for tornado gen.coroutine support self.workers.update(dict(zip(to_open, workers))) def _update_worker_status(self, op, msg): @@ -366,7 +362,7 @@ async def _(): await self.scheduler await self._correct_state() if self.workers: - await asyncio.wait(list(self.workers.values())) # maybe there are more + await asyncio.gather(*self.workers.values()) # maybe there are more return self return _().__await__() @@ -595,8 +591,6 @@ async def run_spec(spec: dict, *args): if workers: await asyncio.gather(*workers.values()) - for w in workers.values(): - await w # for tornado gen.coroutine support return workers