From 3e4b188f1c999bc62a42f1fb290527ad754cf29b Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 21 Apr 2021 12:43:44 -0500 Subject: [PATCH 1/2] Avoid active_threads changing size during iteration --- distributed/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 821b3084a86..3ed5431edf7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -950,9 +950,11 @@ async def heartbeat(self): address=self.contact_address, now=start, metrics=await self.get_metrics(), + # Create a copy of `self.active_threads.values()` to avoid + # `self.active_threads` changing size during iteration executing={ key: start - self.tasks[key].start_time - for key in self.active_threads.values() + for key in list(self.active_threads.values()) if key in self.tasks }, ) From 101af7cce1840c12819b668385cfb0a1164adae4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 22 Apr 2021 11:17:34 -0500 Subject: [PATCH 2/2] Use active_threads_lock --- distributed/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 3ed5431edf7..34323cb4584 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -945,16 +945,16 @@ async def heartbeat(self): logger.debug("Heartbeat: %s", self.address) try: start = time() + with self.active_threads_lock: + active_keys = list(self.active_threads.values()) response = await retry_operation( self.scheduler.heartbeat_worker, address=self.contact_address, now=start, metrics=await self.get_metrics(), - # Create a copy of `self.active_threads.values()` to avoid - # `self.active_threads` changing size during iteration executing={ key: start - self.tasks[key].start_time - for key in list(self.active_threads.values()) + for key in active_keys if key in self.tasks }, )