From b404dd4ad5d735e65a10bd74681b82b39fd235c1 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 20 May 2022 19:08:06 +0200 Subject: [PATCH 1/3] Do not transition to fetch is dep is missing during handle_compute --- .../diagnostics/tests/test_worker_plugin.py | 26 -- distributed/tests/test_cancelled_state.py | 101 ++++++++ distributed/tests/test_worker.py | 3 +- .../tests/test_worker_state_machine.py | 20 ++ distributed/worker.py | 235 +++++++++++------- distributed/worker_state_machine.py | 16 ++ 6 files changed, 280 insertions(+), 121 deletions(-) diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 62074de8ed2..1a3b88910fe 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -195,32 +195,6 @@ class MyCustomPlugin(WorkerPlugin): assert next(iter(w.plugins)).startswith("MyCustomPlugin-") -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_release_key_deprecated(c, s, a): - class ReleaseKeyDeprecated(WorkerPlugin): - def __init__(self): - self._called = False - - def release_key(self, key, state, cause, reason, report): - # Ensure that the handler still works - self._called = True - assert state == "memory" - assert key == "task" - - def teardown(self, worker): - assert self._called - return super().teardown(worker) - - await c.register_worker_plugin(ReleaseKeyDeprecated()) - - with pytest.warns( - FutureWarning, match="The `WorkerPlugin.release_key` hook is deprecated" - ): - assert await c.submit(inc, 1, key="x") == 2 - while "x" in a.tasks: - await asyncio.sleep(0.01) - - @gen_cluster(client=True, nthreads=[("", 1)]) async def test_assert_no_warning_no_overload(c, s, a): """Assert we do not receive a deprecation warning if we do not overload any diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 409d530277c..67fac061031 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -1,5 +1,7 @@ import asyncio +import pytest + import distributed from distributed import Event, Lock, Worker from distributed.client import wait @@ -452,3 +454,102 @@ async def get_data(self, comm, *args, **kwargs): # w1 closed assert await f4 == 6 + + +@pytest.mark.parametrize("wait_for_processing", [True, False]) +@pytest.mark.parametrize("raise_error", [True, False]) +@gen_cluster(client=True) +async def test_resumed_cancelled_handle_compute( + c, s, a, b, raise_error, wait_for_processing +): + """ + Given the history of a task + Executing -> Cancelled -> Fetch -> Resumed + + A handle_compute should properly restore executing + """ + # This test is heavily using set_restrictions to simulate certain scheduler + # decisions of placing keys + + lock_compute = Lock() + await lock_compute.acquire() + enter_compute = Event() + + def block(x, lock, enter_event): + enter_event.set() + with lock: + if raise_error: + raise RuntimeError("test error") + else: + return x + 1 + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit( + block, + f2, + lock=lock_compute, + enter_event=enter_compute, + key="f3", + workers=[b.address], + ) + + f4 = c.submit(sum, [f1, f3], workers=[b.address]) + + await enter_compute.wait() + + async def release_all_futures(): + futs = [f1, f2, f3, f4] + for fut in futs: + fut.release() + + while any(fut.key in s.tasks for fut in futs): + await asyncio.sleep(0.05) + + await release_all_futures() + await wait_for_state(f3.key, "cancelled", b) + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit(inc, f2, key="f3", workers=[a.address]) + f4 = c.submit(sum, [f1, f3], workers=[b.address]) + + await wait_for_state(f3.key, "resumed", b) + await release_all_futures() + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit(inc, f2, key="f3", workers=[b.address]) + f4 = c.submit(sum, [f1, f3], workers=[b.address]) + + if wait_for_processing: + await wait_for_state(f3.key, "processing", s) + + await lock_compute.release() + + if not raise_error: + assert await f4 == 4 + 2 + + assert_story( + b.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "released", "cancelled", {}), + (f3.key, "cancelled", "fetch", "resumed", {}), + (f3.key, "resumed", "memory", "memory", {}), + ], + ) + + else: + with pytest.raises(RuntimeError, match="test error"): + await f3 + + assert_story( + b.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "released", "cancelled", {}), + (f3.key, "cancelled", "fetch", "resumed", {}), + (f3.key, "resumed", "error", "error", {}), + ], + ) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d5b5966cc24..3a427ddb0ad 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3047,11 +3047,12 @@ async def test_task_flight_compute_oserror(c, s, a, b): ), # inc is lost and needs to be recomputed. Therefore, sum is released ("free-keys", ("f1",)), - ("f1", "release-key"), + ("f1", "purge-state"), # The recommendations here are hard to predict. Whatever key is # currently scheduled to be fetched, if any, will be recommended to be # released. ("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"), + ("f1", "purge-state"), ("f1", "released", "forgotten", "forgotten", {}), # Now, we actually compute the task *once*. This must not cycle back ("f1", "compute-task", "released"), diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 20ac45fc054..c9cf0eb155b 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -495,3 +495,23 @@ async def test_forget_data_needed(c, s, a, b): x = c.submit(inc, 2, key="x", workers=[a.address]) y = c.submit(inc, x, key="y", workers=[b.address]) assert await y == 4 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3) +async def test_missing_handle_compute_dependency(c, s, w1, w2, w3): + """Test that it is OK for a dependency to be in state missing if a dependent is asked to be computed""" + + w3.periodic_callbacks["find-missing"].stop() + + f1 = c.submit(inc, 1, key="f1", workers=[w1.address]) + f2 = c.submit(inc, 2, key="f2", workers=[w1.address]) + + w3.handle_acquire_replicas( + keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire" + ) + + await wait_for_state(f1.key, "missing", w3) + + f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address]) + + await f3 diff --git a/distributed/worker.py b/distributed/worker.py index 4ba5d551333..38998fcf8ea 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1268,7 +1268,9 @@ async def heartbeat(self): async def handle_scheduler(self, comm): await self.handle_stream(comm) logger.info( - "Connection to scheduler broken. Closing without reporting. Status: %s", + "Connection to scheduler broken. Closing without reporting. ID: %s Address %s Status: %s", + self.id, + self.address, self.status, ) await self.close() @@ -1969,19 +1971,25 @@ def handle_compute_task( recommendations: Recs = {} instructions: Instructions = [] - if ts.state in READY | {"executing", "long-running", "waiting", "resumed"}: + if ts.state in READY | { + "executing", + "long-running", + "waiting", + }: pass elif ts.state == "memory": instructions.append( self._get_task_finished_msg(ts, stimulus_id=stimulus_id) ) + elif ts.state == "error": + instructions.append(TaskErredMsg.from_task(ts, stimulus_id=stimulus_id)) elif ts.state in { "released", "fetch", "flight", "missing", "cancelled", - "error", + "resumed", }: recommendations[ts] = "waiting" @@ -2097,7 +2105,7 @@ def transition_released_fetch( def transition_generic_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - self.release_key(ts.key, stimulus_id=stimulus_id) + self._purge_state(ts.key, stimulus_id=stimulus_id) recs: Recs = {} for dependency in ts.dependencies: if ( @@ -2106,6 +2114,7 @@ def transition_generic_released( ): recs[dependency] = "released" + ts.state = "released" if not ts.dependents: recs[ts] = "forgotten" @@ -2127,7 +2136,10 @@ def transition_released_waiting( if dep_ts.state != "memory": ts.waiting_for_data.add(dep_ts) dep_ts.waiters.add(ts) - recommendations[dep_ts] = "fetch" + # TODO: main branch claims this shouldn't be conditional since a + # recent change + if dep_ts.state not in {"fetch", "flight", "missing"}: + recommendations[dep_ts] = "fetch" if ts.waiting_for_data: self.waiting_for_data_count += 1 @@ -2275,15 +2287,10 @@ def transition_generic_error( ts.exception_text = exception_text ts.traceback_text = traceback_text ts.state = "error" - smsg = TaskErredMsg( - key=ts.key, - exception=exception, - traceback=traceback, - exception_text=exception_text, - traceback_text=traceback_text, - thread=self.threads.get(ts.key), - startstops=ts.startstops, + smsg = TaskErredMsg.from_task( + ts, stimulus_id=stimulus_id, + thread=self.threads.get(ts.key), ) return {}, [smsg] @@ -2649,7 +2656,7 @@ def transition_released_forgotten( dep.dependents.discard(ts) if dep.state == "released" and not dep.dependents: recommendations[dep] = "forgotten" - + self._purge_state(ts.key, stimulus_id=stimulus_id) # Mark state as forgotten in case it is still referenced ts.state = "forgotten" self.tasks.pop(ts.key, None) @@ -3521,80 +3528,48 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None: # Update status and send confirmation to the Scheduler (see status.setter) self.status = new_status - def release_key( + def _purge_state( self, key: str, - cause: TaskState | None = None, - report: bool = True, *, stimulus_id: str, ) -> None: - try: - if self.validate: - assert not isinstance(key, TaskState) - ts = self.tasks[key] - # needed for legacy notification support - state_before = ts.state - ts.state = "released" - - logger.debug( - "Release key %s", - {"key": key, "cause": cause, "stimulus_id": stimulus_id}, - ) - if cause: - self.log.append( - (key, "release-key", {"cause": cause}, stimulus_id, time()) - ) - else: - self.log.append((key, "release-key", stimulus_id, time())) - if key in self.data: - try: - del self.data[key] - except FileNotFoundError: - logger.error("Tried to delete %s but no file found", exc_info=True) - if key in self.actors: - del self.actors[key] - - self.data_needed.discard(ts) - for w in ts.who_has: - self.has_what[w].discard(ts.key) - self.data_needed_per_worker[w].discard(ts) - ts.who_has.clear() - - if key in self.threads: - del self.threads[key] - - if ts.resource_restrictions is not None: - if ts.state == "executing": - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity + """Ensure that TaskState attributes are reset to a neutral default and + Worker-level state associated to the provided key is cleared (e.g. + who_has) + This is idempotent + """ + ts = self.tasks[key] + logger.debug( + "Purge task key: %s state: %s; stimulus_id=%s", + ts.key, + ts.state, + stimulus_id, + ) + self.log.append((key, "purge-state", stimulus_id, time())) + self.data.pop(key, None) + self.actors.pop(key, None) - for d in ts.dependencies: - ts.waiting_for_data.discard(d) - d.waiters.discard(ts) + for worker in ts.who_has: + self.has_what[worker].discard(ts.key) + self.data_needed_per_worker[worker].discard(ts) + ts.who_has.clear() + self.data_needed.discard(ts) - ts.waiting_for_data.clear() - ts.nbytes = None - ts._previous = None - ts._next = None - ts.done = False + self.threads.pop(key, None) - self._executing.discard(ts) - self._in_flight_tasks.discard(ts) + for d in ts.dependencies: + ts.waiting_for_data.discard(d) + d.waiters.discard(ts) - self._notify_plugins( - "release_key", key, state_before, cause, stimulus_id, report - ) - except CommClosedError: - # Batched stream send might raise if it was already closed - pass - except Exception as e: # pragma: no cover - logger.exception(e) - if LOG_PDB: - import pdb + ts.waiting_for_data.clear() + ts.nbytes = None + ts._previous = None + ts._next = None + ts.done = False - pdb.set_trace() - raise + self._executing.discard(ts) + self._in_flight_tasks.discard(ts) ################ # Execute Task # @@ -4184,15 +4159,6 @@ def get_call_stack(self, keys: Collection[str] | None = None) -> dict[str, Any]: def _notify_plugins(self, method_name, *args, **kwargs): for name, plugin in self.plugins.items(): if hasattr(plugin, method_name): - if method_name == "release_key": - warnings.warn( - "The `WorkerPlugin.release_key` hook is deprecated and will be " - "removed in a future version. A similar event can now be " - "caught by filtering for a `finish=='released'` event in the " - "`WorkerPlugin.transition` hook.", - FutureWarning, - ) - try: getattr(plugin, method_name)(*args, **kwargs) except Exception: @@ -4227,9 +4193,6 @@ def validate_task_executing(self, ts): assert ts.run_spec is not None assert ts.key not in self.data assert not ts.waiting_for_data - for dep in ts.dependencies: - assert dep.state == "memory", self.story(dep) - assert dep.key in self.data or dep.key in self.actors def validate_task_ready(self, ts): assert ts.key in pluck(1, self.ready) @@ -4294,8 +4257,12 @@ def validate_task_released(self, ts): assert ts not in self._executing assert ts not in self._in_flight_tasks assert ts not in self._missing_dep_flight - assert ts not in self._missing_dep_flight - assert not any(ts.key in has_what for has_what in self.has_what.values()) + + # FIXME the below assert statement is true most of the time. If a task + # performs the transition flight->cancel->waiting, its dependencies are + # normally in released state. However, the compute-task call for their + # previous dependent provided them with who_has, such that this assert + # is no longer true. assert not ts.waiting_for_data assert not ts.done assert not ts.exception @@ -4343,7 +4310,7 @@ def validate_task(self, ts): ) raise AssertionError( - f"Invalid TaskState encountered for {ts!r}.\nStory:\n{self.story(ts)}\n" + f"Invalid TaskState encountered on {self.id} for {ts!r}.\nStory:\n{self.story(ts)}\n" ) from e def validate_state(self): @@ -4381,10 +4348,11 @@ def validate_state(self): for worker, keys in self.has_what.items(): assert worker != self.address for k in keys: + assert k in self.tasks, self.story(k) assert worker in self.tasks[k].who_has for ts in self.data_needed: - assert ts.state == "fetch" + assert ts.state == "fetch", self.story(ts) assert self.tasks[ts.key] is ts for worker, tss in self.data_needed_per_worker.items(): for ts in tss: @@ -5205,3 +5173,82 @@ async def benchmark_network( out[size_str] = total / (time() - start) return out + + +[ + ( + "f1", + "ensure-task-exists", + "released", + "compute-task-1654160270.4501", + 1654160270.450457, + ), + ( + "f1", + "released", + "fetch", + "fetch", + {}, + "compute-task-1654160270.4501", + 1654160270.450478, + ), + ( + "gather-dependencies", + "tcp://127.0.0.1:54108", + {"f1"}, + "compute-task-1654160270.4501", + 1654160270.45051, + ), + ( + "f1", + "fetch", + "flight", + "flight", + {}, + "compute-task-1654160270.4501", + 1654160270.4505181, + ), + ( + "request-dep", + "tcp://127.0.0.1:54108", + {"f1"}, + "compute-task-1654160270.4501", + 1654160270.450558, + ), + ( + "busy-gather", + "tcp://127.0.0.1:54108", + {"f1"}, + "compute-task-1654160270.4501", + 1654160270.4519591, + ), + ( + "f1", + "flight", + "fetch", + "fetch", + {}, + "compute-task-1654160270.4501", + 1654160270.4519749, + ), + ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.4605172), + ( + "f1", + "fetch", + "released", + "released", + {"f1": "forgotten"}, + "worker-connect-1654160270.3552", + 1654160270.460523, + ), + ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.460525), + ( + "f1", + "released", + "forgotten", + "forgotten", + {}, + "worker-connect-1654160270.3552", + 1654160270.4605281, + ), +] diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index b456fddb650..65bf2adb0e7 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -300,6 +300,22 @@ def to_dict(self) -> dict[str, Any]: d["status"] = "error" return d + @staticmethod + def from_task( + ts: TaskState, stimulus_id: str, thread: int | None = None + ) -> TaskErredMsg: + assert ts.exception + return TaskErredMsg( + key=ts.key, + exception=ts.exception, + traceback=ts.traceback, + exception_text=ts.exception_text, + traceback_text=ts.traceback_text, + thread=thread, + startstops=ts.startstops, + stimulus_id=stimulus_id, + ) + @dataclass class ReleaseWorkerDataMsg(SendMessageToScheduler): From 853a59f03c02852198e11b2c65e7402033d36f67 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 2 Jun 2022 19:26:44 +0200 Subject: [PATCH 2/3] More fixes --- distributed/tests/test_worker.py | 2 - .../tests/test_worker_state_machine.py | 20 +++ distributed/worker.py | 132 ++++-------------- 3 files changed, 47 insertions(+), 107 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 3a427ddb0ad..4e0cddd9f3b 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3047,12 +3047,10 @@ async def test_task_flight_compute_oserror(c, s, a, b): ), # inc is lost and needs to be recomputed. Therefore, sum is released ("free-keys", ("f1",)), - ("f1", "purge-state"), # The recommendations here are hard to predict. Whatever key is # currently scheduled to be fetched, if any, will be recommended to be # released. ("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"), - ("f1", "purge-state"), ("f1", "released", "forgotten", "forgotten", {}), # Now, we actually compute the task *once*. This must not cycle back ("f1", "compute-task", "released"), diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index c9cf0eb155b..fdc7f01237f 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -505,6 +505,7 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3): f1 = c.submit(inc, 1, key="f1", workers=[w1.address]) f2 = c.submit(inc, 2, key="f2", workers=[w1.address]) + await wait_for_state(f1.key, "memory", w1) w3.handle_acquire_replicas( keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire" @@ -515,3 +516,22 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3): f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address]) await f3 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3) +async def test_missing_to_waiting(c, s, w1, w2, w3): + w3.periodic_callbacks["find-missing"].stop() + + f1 = c.submit(inc, 1, key="f1", workers=[w1.address], allow_other_workers=True) + await wait_for_state(f1.key, "memory", w1) + + w3.handle_acquire_replicas( + keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire" + ) + + await wait_for_state(f1.key, "missing", w3) + + await w2.close() + await w1.close() + + await f1 diff --git a/distributed/worker.py b/distributed/worker.py index 38998fcf8ea..adb4a3cf82c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2058,12 +2058,21 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr # _select_keys_for_gather(). return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)] + def transition_missing_waiting( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: + self._missing_dep_flight.discard(ts) + self._purge_state(ts) + return self.transition_released_waiting(ts, stimulus_id=stimulus_id) + def transition_missing_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: assert ts.state == "missing" - assert ts.who_has + + if not ts.who_has: + return {}, [] self._missing_dep_flight.discard(ts) return self.transition_generic_fetch(ts, stimulus_id=stimulus_id) @@ -2105,7 +2114,7 @@ def transition_released_fetch( def transition_generic_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - self._purge_state(ts.key, stimulus_id=stimulus_id) + self._purge_state(ts) recs: Recs = {} for dependency in ts.dependencies: if ( @@ -2127,7 +2136,6 @@ def transition_released_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: - assert ts.state == "released" assert all(d.key in self.tasks for d in ts.dependencies) recommendations: Recs = {} @@ -2136,10 +2144,7 @@ def transition_released_waiting( if dep_ts.state != "memory": ts.waiting_for_data.add(dep_ts) dep_ts.waiters.add(ts) - # TODO: main branch claims this shouldn't be conditional since a - # recent change - if dep_ts.state not in {"fetch", "flight", "missing"}: - recommendations[dep_ts] = "fetch" + recommendations[dep_ts] = "fetch" if ts.waiting_for_data: self.waiting_for_data_count += 1 @@ -2447,7 +2452,6 @@ def transition_cancelled_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if not ts.done: - ts._next = "released" return {}, [] self._executing.discard(ts) self._in_flight_tasks.discard(ts) @@ -2656,7 +2660,7 @@ def transition_released_forgotten( dep.dependents.discard(ts) if dep.state == "released" and not dep.dependents: recommendations[dep] = "forgotten" - self._purge_state(ts.key, stimulus_id=stimulus_id) + self._purge_state(ts) # Mark state as forgotten in case it is still referenced ts.state = "forgotten" self.tasks.pop(ts.key, None) @@ -2708,6 +2712,7 @@ def transition_released_forgotten( ("missing", "fetch"): transition_missing_fetch, ("missing", "released"): transition_missing_released, ("missing", "error"): transition_generic_error, + ("missing", "waiting"): transition_missing_waiting, ("ready", "error"): transition_generic_error, ("ready", "executing"): transition_ready_executing, ("ready", "released"): transition_generic_released, @@ -2793,7 +2798,9 @@ def _transition( (recs, instructions), self._transition(ts, finish, *args, stimulus_id=stimulus_id), ) - except InvalidTransition: + # ValueError may be raised by merge_recs_instructions + # TODO: should merge_recs raise InvalidTransition? + except (ValueError, InvalidTransition): self.log_event( "invalid-worker-transition", { @@ -3528,25 +3535,14 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None: # Update status and send confirmation to the Scheduler (see status.setter) self.status = new_status - def _purge_state( - self, - key: str, - *, - stimulus_id: str, - ) -> None: + def _purge_state(self, ts: TaskState) -> None: """Ensure that TaskState attributes are reset to a neutral default and Worker-level state associated to the provided key is cleared (e.g. who_has) This is idempotent """ - ts = self.tasks[key] - logger.debug( - "Purge task key: %s state: %s; stimulus_id=%s", - ts.key, - ts.state, - stimulus_id, - ) - self.log.append((key, "purge-state", stimulus_id, time())) + key = ts.key + logger.debug("Purge task key: %s state: %s; stimulus_id=%s", ts.key, ts.state) self.data.pop(key, None) self.actors.pop(key, None) @@ -3976,7 +3972,8 @@ def _(self, ev: FindMissingEvent) -> RecsInstrs: return {}, [] if self.validate: - assert not any(ts.who_has for ts in self._missing_dep_flight) + for ts in self._missing_dep_flight: + assert not ts.who_has, self.story(ts) smsg = RequestRefreshWhoHasMsg( keys=[ts.key for ts in self._missing_dep_flight], @@ -4193,6 +4190,9 @@ def validate_task_executing(self, ts): assert ts.run_spec is not None assert ts.key not in self.data assert not ts.waiting_for_data + for dep in ts.dependencies: + assert dep.state == "memory", self.story(dep) + assert dep.key in self.data or dep.key in self.actors def validate_task_ready(self, ts): assert ts.key in pluck(1, self.ready) @@ -4240,7 +4240,8 @@ def validate_task_missing(self, ts): def validate_task_cancelled(self, ts): assert ts.key not in self.data assert ts._previous in {"long-running", "executing", "flight"} - assert ts._next is None # We'll always transition to released after it is done + # We'll always transition to released after it is done + assert ts._next is None, (ts.key, ts._next, self.story(ts)) def validate_task_resumed(self, ts): assert ts.key not in self.data @@ -5173,82 +5174,3 @@ async def benchmark_network( out[size_str] = total / (time() - start) return out - - -[ - ( - "f1", - "ensure-task-exists", - "released", - "compute-task-1654160270.4501", - 1654160270.450457, - ), - ( - "f1", - "released", - "fetch", - "fetch", - {}, - "compute-task-1654160270.4501", - 1654160270.450478, - ), - ( - "gather-dependencies", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.45051, - ), - ( - "f1", - "fetch", - "flight", - "flight", - {}, - "compute-task-1654160270.4501", - 1654160270.4505181, - ), - ( - "request-dep", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.450558, - ), - ( - "busy-gather", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.4519591, - ), - ( - "f1", - "flight", - "fetch", - "fetch", - {}, - "compute-task-1654160270.4501", - 1654160270.4519749, - ), - ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.4605172), - ( - "f1", - "fetch", - "released", - "released", - {"f1": "forgotten"}, - "worker-connect-1654160270.3552", - 1654160270.460523, - ), - ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.460525), - ( - "f1", - "released", - "forgotten", - "forgotten", - {}, - "worker-connect-1654160270.3552", - 1654160270.4605281, - ), -] From 665ebb4991599e91039586f02b49939a2f891e10 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 6 Jun 2022 15:01:32 +0100 Subject: [PATCH 3/3] Update distributed/worker.py --- distributed/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/worker.py b/distributed/worker.py index adb4a3cf82c..503c64a2490 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -4264,6 +4264,8 @@ def validate_task_released(self, ts): # normally in released state. However, the compute-task call for their # previous dependent provided them with who_has, such that this assert # is no longer true. + # assert not any(ts.key in has_what for has_what in self.has_what.values()) + assert not ts.waiting_for_data assert not ts.done assert not ts.exception