From e743f9d95bc3a911ff715a3d9dc3aae6b1ecd561 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 18 May 2022 18:19:33 +0200 Subject: [PATCH 1/3] Ensure find-missing is not running concurrently --- distributed/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 57a872455fd..796fd6a62f0 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -827,6 +827,7 @@ def __init__( # FIXME annotations: https://github.com/tornadoweb/tornado/issues/3117 pc = PeriodicCallback(self.find_missing, 1000) # type: ignore + self._find_missing_running = False self.periodic_callbacks["find-missing"] = pc self._address = contact_address @@ -3432,9 +3433,10 @@ def _readd_busy_worker(self, worker: str) -> None: @log_errors async def find_missing(self) -> None: - if not self._missing_dep_flight: + if self._find_missing_running or not self._missing_dep_flight: return try: + self._find_missing_running = True if self.validate: for ts in self._missing_dep_flight: assert not ts.who_has @@ -3452,6 +3454,7 @@ async def find_missing(self) -> None: self.transitions(recommendations, stimulus_id=stimulus_id) finally: + self._find_missing_running = False # This is quite arbitrary but the heartbeat has scaling implemented self.periodic_callbacks[ "find-missing" From dd465f6299afe0f7f5eca3d36de65a8dbe60a118 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 18 May 2022 18:20:03 +0200 Subject: [PATCH 2/3] Remove incorrect assert in handle_compute_task --- distributed/tests/test_cancelled_state.py | 58 +++++++++++++++++++++++ distributed/worker.py | 7 --- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 1abb00db554..50b566643be 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -2,6 +2,7 @@ import distributed from distributed import Event, Lock, Worker +from distributed.client import wait from distributed.utils_test import ( _LockedCommPool, assert_story, @@ -396,3 +397,60 @@ def block_execution(event, lock): await lock_executing.release() assert await fut2 == 2 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 2) +async def test_cancelled_resumed_after_flight_with_dependencies(c, s, w2, w3): + # See https://github.com/dask/distributed/pull/6327#discussion_r872231090 + block_get_data_1 = asyncio.Lock() + enter_get_data_1 = asyncio.Event() + await block_get_data_1.acquire() + + class BlockGetDataWorker(Worker): + def __init__(self, *args, get_data_event, get_data_lock, **kwargs): + self._get_data_event = get_data_event + self._get_data_lock = get_data_lock + super().__init__(*args, **kwargs) + + async def get_data(self, comm, *args, **kwargs): + self._get_data_event.set() + async with self._get_data_lock: + return await super().get_data(comm, *args, **kwargs) + + async with await BlockGetDataWorker( + s.address, + get_data_event=enter_get_data_1, + get_data_lock=block_get_data_1, + name="w1", + ) as w1: + + f1 = c.submit(inc, 1, key="f1", workers=[w1.address]) + f2 = c.submit(inc, 2, key="f2", workers=[w1.address]) + f3 = c.submit(sum, [f1, f2], workers=[w1.address]) + + await wait(f3) + f4 = c.submit(inc, f3, key="f4", workers=[w2.address]) + + await enter_get_data_1.wait() + s.set_restrictions( + { + f1.key: {w3.address}, + f2.key: {w3.address}, + f3.key: {w2.address}, + } + ) + await s.remove_worker(w1.address, "stim-id") + + while w2.tasks[f3.key].state != "resumed": + await asyncio.sleep(0.1) + assert_story( + w2.log, + [ + (f3.key, "flight", "released", "cancelled", {}), + # ... + (f3.key, "cancelled", "waiting", "resumed", {}), + ], + ) + # w1 closed + + assert await f4 == 6 diff --git a/distributed/worker.py b/distributed/worker.py index 796fd6a62f0..84f25221a63 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1984,13 +1984,6 @@ def handle_compute_task( self.transitions(recommendations, stimulus_id=stimulus_id) self._handle_instructions(instructions) - if self.validate: - # All previously unknown tasks that were created above by - # ensure_tasks_exists() have been transitioned to fetch or flight - assert all( - ts2.state != "released" for ts2 in (ts, *ts.dependencies) - ), self.story(ts, *ts.dependencies) - ######################## # Worker State Machine # ######################## From 3e9134b0d787796c31425f095074c6b27e03a609 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 19 May 2022 09:54:58 +0200 Subject: [PATCH 3/3] Code review --- distributed/tests/test_cancelled_state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 50b566643be..76ac441ead3 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -426,7 +426,7 @@ async def get_data(self, comm, *args, **kwargs): f1 = c.submit(inc, 1, key="f1", workers=[w1.address]) f2 = c.submit(inc, 2, key="f2", workers=[w1.address]) - f3 = c.submit(sum, [f1, f2], workers=[w1.address]) + f3 = c.submit(sum, [f1, f2], key="f3", workers=[w1.address]) await wait(f3) f4 = c.submit(inc, f3, key="f4", workers=[w2.address]) @@ -441,8 +441,7 @@ async def get_data(self, comm, *args, **kwargs): ) await s.remove_worker(w1.address, "stim-id") - while w2.tasks[f3.key].state != "resumed": - await asyncio.sleep(0.1) + await wait_for_state(f3.key, "resumed", w2) assert_story( w2.log, [