From 0adc10b7c387170052ca44d1b70a1a4f0893b008 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 22 Jun 2023 12:13:57 +0100 Subject: [PATCH 1/3] Add Cluster.called_from_running_loop and fix Cluster.asynchronous --- distributed/deploy/cluster.py | 9 +++++++++ distributed/deploy/spec.py | 9 +-------- distributed/utils.py | 5 ++++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index f0e5bd595c1..0a86d001bfc 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -114,6 +114,15 @@ def loop(self, value: IOLoop) -> None: raise ValueError("expected an IOLoop, got None") self.__loop = value + @property + def called_from_running_loop(self): + try: + return ( + getattr(self.loop, "asyncio_loop", None) is asyncio.get_running_loop() + ) + except RuntimeError: + return self.asynchronous + @property def name(self): return self._cluster_info["name"] diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index fe51468a68a..de48a231ad4 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -279,14 +279,7 @@ def __init__( scheduler_sync_interval=scheduler_sync_interval, ) - try: - called_from_running_loop = ( - getattr(loop, "asyncio_loop", None) is asyncio.get_running_loop() - ) - except RuntimeError: - called_from_running_loop = asynchronous - - if not called_from_running_loop: + if not self.called_from_running_loop: self._loop_runner.start() self.sync(self._start) try: diff --git a/distributed/utils.py b/distributed/utils.py index 53484577688..1b0c98d3978 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -334,7 +334,10 @@ class SyncMethodMixin: @property def asynchronous(self): """Are we running in the event loop?""" - return in_async_call(self.loop, default=getattr(self, "_asynchronous", False)) + try: + return in_async_call(self.loop, default=getattr(self, "_asynchronous", False)) + except RuntimeError as e: + return False def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs): """Call `func` with `args` synchronously or asynchronously depending on From 2a15f002e7adbbee9046fd5b99390d345d9ff596 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 22 Jun 2023 12:29:12 +0100 Subject: [PATCH 2/3] Store asynchronous kwarg in private attribute --- distributed/deploy/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 0a86d001bfc..09c2ed0de5d 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -71,6 +71,7 @@ def __init__( scheduler_sync_interval=1, ): self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) + self.__asynchronous = asynchronous self.scheduler_info = {"workers": {}} self.periodic_callbacks = {} @@ -121,7 +122,7 @@ def called_from_running_loop(self): getattr(self.loop, "asyncio_loop", None) is asyncio.get_running_loop() ) except RuntimeError: - return self.asynchronous + return self.__asynchronous @property def name(self): From e9601a9e9bc4d5c207d98e154fbd43ec72743057 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 22 Jun 2023 12:34:50 +0100 Subject: [PATCH 3/3] Linting --- distributed/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 1b0c98d3978..a7494234f0a 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -335,8 +335,10 @@ class SyncMethodMixin: def asynchronous(self): """Are we running in the event loop?""" try: - return in_async_call(self.loop, default=getattr(self, "_asynchronous", False)) - except RuntimeError as e: + return in_async_call( + self.loop, default=getattr(self, "_asynchronous", False) + ) + except RuntimeError: return False def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):