Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async def test_set_resources(c, s, a):
assert s.workers[a.address].resources == {"A": 3}


@pytest.mark.repeat(100) # DO NOT MERGE
@gen_cluster(
client=True,
nthreads=[
Expand Down
39 changes: 27 additions & 12 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,9 @@ def _transition_waiting_constrained(
for dep in ts.dependencies
)
assert all(dep.state == "memory" for dep in ts.dependencies)
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
ts.state = "constrained"
self.constrained.append(ts.key)
return self._ensure_computing()
Expand Down Expand Up @@ -1885,7 +1887,9 @@ def _transition_waiting_ready(
) -> RecsInstrs:
if self.validate:
assert ts.state == "waiting"
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
assert not ts.waiting_for_data
for dep in ts.dependencies:
assert dep.key in self.data or dep.key in self.actors
Expand Down Expand Up @@ -2165,7 +2169,6 @@ def _transition_executing_memory(
if self.validate:
assert ts.state in ("executing", "long-running")
assert not ts.waiting_for_data
assert ts.key not in self.ready

self.executing.discard(ts)
self.long_running.discard(ts)
Expand All @@ -2182,7 +2185,9 @@ def _transition_constrained_executing(
assert not ts.waiting_for_data
assert ts.key not in self.data
assert ts.state in READY
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
for dep in ts.dependencies:
assert dep.key in self.data or dep.key in self.actors

Expand All @@ -2197,7 +2202,9 @@ def _transition_ready_executing(
assert not ts.waiting_for_data
assert ts.key not in self.data
assert ts.state in READY
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
assert all(
dep.key in self.data or dep.key in self.actors
for dep in ts.dependencies
Expand Down Expand Up @@ -3082,8 +3089,6 @@ def _validate_task_memory(self, ts: TaskState) -> None:
assert ts.key in self.data or ts.key in self.actors
assert isinstance(ts.nbytes, int)
assert not ts.waiting_for_data
assert ts.key not in self.ready
assert ts.state == "memory"

def _validate_task_executing(self, ts: TaskState) -> None:
if ts.state == "executing":
Expand All @@ -3102,9 +3107,19 @@ def _validate_task_executing(self, ts: TaskState) -> None:
assert dep.key in self.data or dep.key in self.actors

def _validate_task_ready(self, ts: TaskState) -> None:
assert ts.key in pluck(1, self.ready)
if ts.state == "ready":
assert not ts.resource_restrictions
assert ts.key in pluck(1, self.ready)
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in self.constrained
else:
assert ts.resource_restrictions
assert ts.state == "constrained"
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
assert ts.key in self.constrained
Comment thread
crusaderky marked this conversation as resolved.

assert ts.key not in self.data
assert ts.state != "executing"
assert not ts.done
assert not ts.waiting_for_data
assert all(
Expand All @@ -3113,15 +3128,15 @@ def _validate_task_ready(self, ts: TaskState) -> None:

def _validate_task_waiting(self, ts: TaskState) -> None:
assert ts.key not in self.data
assert ts.state == "waiting"
assert not ts.done
if ts.dependencies and ts.run_spec:
assert not all(dep.key in self.data for dep in ts.dependencies)

def _validate_task_flight(self, ts: TaskState) -> None:
assert ts.key not in self.data
assert ts in self.in_flight_tasks
assert not any(dep.key in self.ready for dep in ts.dependents)
# FIXME https://github.com/dask/distributed/issues/6710
# assert not any(dep.key in self.ready for dep in ts.dependents)
assert ts.coming_from
assert ts.coming_from in self.in_flight_workers
assert ts.key in self.in_flight_workers[ts.coming_from]
Expand Down Expand Up @@ -3189,7 +3204,7 @@ def validate_task(self, ts: TaskState) -> None:
self._validate_task_cancelled(ts)
elif ts.state == "resumed":
self._validate_task_resumed(ts)
elif ts.state == "ready":
elif ts.state in ("ready", "constrained"):
self._validate_task_ready(ts)
elif ts.state in ("executing", "long-running"):
self._validate_task_executing(ts)
Expand Down