From 30f64070a364bcf82092e8a27ed4d5f1684bcb12 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 11 Jul 2022 13:06:40 +0100 Subject: [PATCH 1/4] Validate constrained tasks --- distributed/tests/test_resources.py | 1 + distributed/worker_state_machine.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 3ee222a0027..a2e8768c6e5 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -286,6 +286,7 @@ async def test_set_resources(c, s, a): assert s.workers[a.address].resources == {"A": 3} +@pytest.mark.repeat(100) # DO NOT MERGE @gen_cluster( client=True, nthreads=[ diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 2bb8b640a4e..0f91b72ee6b 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3083,7 +3083,6 @@ def _validate_task_memory(self, ts: TaskState) -> None: assert isinstance(ts.nbytes, int) assert not ts.waiting_for_data assert ts.key not in self.ready - assert ts.state == "memory" def _validate_task_executing(self, ts: TaskState) -> None: if ts.state == "executing": @@ -3102,9 +3101,17 @@ def _validate_task_executing(self, ts: TaskState) -> None: assert dep.key in self.data or dep.key in self.actors def _validate_task_ready(self, ts: TaskState) -> None: - assert ts.key in pluck(1, self.ready) + if ts.state == "ready": + assert not ts.resource_restrictions + assert ts.key in pluck(1, self.ready) + assert ts.key not in self.constrained + else: + assert ts.resource_restrictions + assert ts.state == "constrained" + assert ts.key not in pluck(1, self.ready) + assert ts.key in self.constrained + assert ts.key not in self.data - assert ts.state != "executing" assert not ts.done assert not ts.waiting_for_data assert all( @@ -3113,7 +3120,6 @@ def _validate_task_ready(self, ts: TaskState) -> None: def _validate_task_waiting(self, ts: TaskState) -> None: assert ts.key not in self.data - assert ts.state == "waiting" assert not ts.done if ts.dependencies and ts.run_spec: assert not all(dep.key in self.data for dep in ts.dependencies) @@ -3189,7 +3195,7 @@ def validate_task(self, ts: TaskState) -> None: self._validate_task_cancelled(ts) elif ts.state == "resumed": self._validate_task_resumed(ts) - elif ts.state == "ready": + elif ts.state in ("ready", "constrained"): self._validate_task_ready(ts) elif ts.state in ("executing", "long-running"): self._validate_task_executing(ts) From 0ad96d8fd3b736701c5669fe0362f6dd09acb62e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 11 Jul 2022 13:24:11 +0100 Subject: [PATCH 2/4] undo mutual exclusivity --- distributed/worker_state_machine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 0f91b72ee6b..17f3567d50d 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3104,11 +3104,13 @@ def _validate_task_ready(self, ts: TaskState) -> None: if ts.state == "ready": assert not ts.resource_restrictions assert ts.key in pluck(1, self.ready) - assert ts.key not in self.constrained + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in self.constrained else: assert ts.resource_restrictions assert ts.state == "constrained" - assert ts.key not in pluck(1, self.ready) + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) assert ts.key in self.constrained assert ts.key not in self.data From 80455f47ef7fa047a090febb4c236a55114825dd Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 11 Jul 2022 13:29:14 +0100 Subject: [PATCH 3/4] Removed assertions --- distributed/worker_state_machine.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 17f3567d50d..ce3031321c0 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1850,7 +1850,8 @@ def _transition_waiting_constrained( for dep in ts.dependencies ) assert all(dep.state == "memory" for dep in ts.dependencies) - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) ts.state = "constrained" self.constrained.append(ts.key) return self._ensure_computing() @@ -1885,7 +1886,8 @@ def _transition_waiting_ready( ) -> RecsInstrs: if self.validate: assert ts.state == "waiting" - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) assert not ts.waiting_for_data for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors @@ -2165,7 +2167,6 @@ def _transition_executing_memory( if self.validate: assert ts.state in ("executing", "long-running") assert not ts.waiting_for_data - assert ts.key not in self.ready self.executing.discard(ts) self.long_running.discard(ts) @@ -2182,7 +2183,9 @@ def _transition_constrained_executing( assert not ts.waiting_for_data assert ts.key not in self.data assert ts.state in READY - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors @@ -2197,7 +2200,9 @@ def _transition_ready_executing( assert not ts.waiting_for_data assert ts.key not in self.data assert ts.state in READY - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained assert all( dep.key in self.data or dep.key in self.actors for dep in ts.dependencies @@ -3082,7 +3087,6 @@ def _validate_task_memory(self, ts: TaskState) -> None: assert ts.key in self.data or ts.key in self.actors assert isinstance(ts.nbytes, int) assert not ts.waiting_for_data - assert ts.key not in self.ready def _validate_task_executing(self, ts: TaskState) -> None: if ts.state == "executing": @@ -3129,7 +3133,8 @@ def _validate_task_waiting(self, ts: TaskState) -> None: def _validate_task_flight(self, ts: TaskState) -> None: assert ts.key not in self.data assert ts in self.in_flight_tasks - assert not any(dep.key in self.ready for dep in ts.dependents) + # FIXME https://github.com/dask/distributed/issues/6710 + # assert not any(dep.key in self.ready for dep in ts.dependents) assert ts.coming_from assert ts.coming_from in self.in_flight_workers assert ts.key in self.in_flight_workers[ts.coming_from] From 1e65ca4f3860e6404102d16e73fb40d11292f739 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 11 Jul 2022 13:32:16 +0100 Subject: [PATCH 4/4] more commented out assertions --- distributed/worker_state_machine.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index ce3031321c0..c021593192e 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1852,6 +1852,7 @@ def _transition_waiting_constrained( assert all(dep.state == "memory" for dep in ts.dependencies) # FIXME https://github.com/dask/distributed/issues/6710 # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained ts.state = "constrained" self.constrained.append(ts.key) return self._ensure_computing() @@ -1888,6 +1889,7 @@ def _transition_waiting_ready( assert ts.state == "waiting" # FIXME https://github.com/dask/distributed/issues/6710 # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained assert not ts.waiting_for_data for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors