From dbe27dd359b0d4a5c4911529d507a2ba35b1e8c8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Jul 2022 20:04:31 +0200 Subject: [PATCH 1/4] Add all_running_tasks and tests --- .../tests/test_worker_state_machine.py | 133 ++++++++++++++++++ distributed/worker_state_machine.py | 16 ++- 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index f08af1af8ca..b3f256e2579 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -28,6 +28,7 @@ ComputeTaskEvent, ExecuteFailureEvent, ExecuteSuccessEvent, + FreeKeysEvent, GatherDep, Instruction, PauseEvent, @@ -1032,6 +1033,138 @@ def test_gather_priority(ws): ] +def test_running_task_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + task = ws.tasks["x"] + assert task + assert task in ws.all_running_tasks + + +def test_cancelled_running_task_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + + ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="cancel")) + task = ws.tasks["x"] + assert task + assert task.state == "cancelled" + assert task in ws.all_running_tasks + + +def test_resumed_running_task_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + + ws.handle_stimulus( + FreeKeysEvent(keys=["x"], stimulus_id="cancel"), + ComputeTaskEvent.dummy( + key="y", + who_has={"x": ["127.0.0.1:1235"]}, + nbytes={"x": 8}, + stimulus_id="compute-y", + ), + ) + task = ws.tasks["x"] + assert task + assert task.state == "resumed" + assert task in ws.all_running_tasks + + +@pytest.mark.xfail(reason="distributed#6565") +def test_successful_running_task_not_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + ws.handle_stimulus( + ExecuteSuccessEvent( + key="x", + value=None, + start=0.0, + stop=1.0, + nbytes=8, + type=None, + stimulus_id="success", + ) + ) + task = ws.tasks["x"] + assert task + assert task.state == "memory" + assert task not in ws.all_running_tasks + + +@pytest.mark.xfail(reason="distributed#6565") +def test_erroneous_running_task_not_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + + ws.handle_stimulus( + ExecuteFailureEvent( + key="x", + start=0.0, + stop=0.0, + exception=Serialize(RuntimeError()), + traceback="failed", + exception_text="exc text", + traceback_text="trc text", + stimulus_id="error", + ) + ) + task = ws.tasks["x"] + assert task + assert task.state == "error" + assert task not in ws.all_running_tasks + + +@pytest.mark.xfail(reason="distributed#6565") +def test_successful_resumed_running_task_not_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + + ws.handle_stimulus( + FreeKeysEvent(keys=["x"], stimulus_id="cancel"), + ComputeTaskEvent.dummy( + key="y", + who_has={"x": ["127.0.0.1:1235"]}, + nbytes={"x": 8}, + stimulus_id="compute-y", + ), + ExecuteSuccessEvent( + key="x", + value=None, + start=0.0, + stop=1.0, + nbytes=8, + type=None, + stimulus_id="success", + ), + ) + task = ws.tasks["x"] + assert task.state == "memory" + assert task not in ws.all_running_tasks + + +@pytest.mark.xfail(reason="distributed#6565") +def test_erroneous_resumed_running_task_not_in_all_running_tasks(ws_with_running_task): + ws = ws_with_running_task + + ws.handle_stimulus( + FreeKeysEvent(keys=["x"], stimulus_id="cancel"), + ComputeTaskEvent.dummy( + key="y", + who_has={"x": ["127.0.0.1:1235"]}, + nbytes={"x": 8}, + stimulus_id="compute-y", + ), + ExecuteFailureEvent( + key="x", + start=0.0, + stop=0.0, + exception=Serialize(RuntimeError()), + traceback="failed", + exception_text="exc text", + traceback_text="trc text", + stimulus_id="error", + ), + ) + task = ws.tasks["x"] + assert task.state == "error" + assert task not in ws.all_running_tasks + + @gen_cluster() async def test_clean_log(s, a, b): """Test that brand new workers start with a clean log""" diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index e5b04da75c2..e78111635e5 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1981,7 +1981,7 @@ def _transition_cancelled_fetch( ts.state = ts._previous return {}, [] else: - assert ts._previous == "executing" + assert ts._previous in {"executing", "long-running"} ts.state = "resumed" ts._next = "fetch" return {}, [] @@ -3123,6 +3123,10 @@ def validate_state(self) -> None: ts_wait.state in READY | {"executing", "flight", "fetch", "missing"} or ts_wait in self.missing_dep_flight or ts_wait.who_has.issubset(self.in_flight_workers) + or ( + ts_wait.state == "resumed" + and ts_wait._previous in {"executing", "long-running"} + ) ), (ts, ts_wait, self.story(ts), self.story(ts_wait)) # FIXME https://github.com/dask/distributed/issues/6319 # assert self.waiting_for_data_count == waiting_for_data_count @@ -3157,6 +3161,16 @@ def validate_state(self) -> None: for tss in self.data_needed.values(): assert len({ts.key for ts in tss}) == len(tss) + @property + def all_running_tasks(self) -> set[TaskState]: + """All tasks that are currently running. + These are: + - ``ts.status`` == ``executing``, ``long-running``, or ``cancelled`` + - ``ts.status` == ``resumed`` and ``ts._previous`` == ``executing`` or ``long-running`` + """ + # Note: tasks in "cancelled" and "resumed" state are still in either of these sets + return self.executing | {self.tasks[key] for key in self.long_running} + class BaseWorker(abc.ABC): """Wrapper around the :class:`WorkerState` that implements instructions handling. From d393050be937792a3b79a5312a1a98cf26492239 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 8 Jul 2022 09:31:50 +0200 Subject: [PATCH 2/4] Code review Co-authored-by: crusaderky --- .../tests/test_worker_state_machine.py | 63 +++++++------------ distributed/worker_state_machine.py | 8 +-- 2 files changed, 25 insertions(+), 46 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index b3f256e2579..8e56dbdef80 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1035,42 +1035,30 @@ def test_gather_priority(ws): def test_running_task_in_all_running_tasks(ws_with_running_task): ws = ws_with_running_task - task = ws.tasks["x"] - assert task - assert task in ws.all_running_tasks - - -def test_cancelled_running_task_in_all_running_tasks(ws_with_running_task): - ws = ws_with_running_task + ts = ws.tasks["x"] + assert ts in ws.all_running_tasks ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="cancel")) - task = ws.tasks["x"] - assert task - assert task.state == "cancelled" - assert task in ws.all_running_tasks - - -def test_resumed_running_task_in_all_running_tasks(ws_with_running_task): - ws = ws_with_running_task + assert ts.state == "cancelled" + assert ts in ws.all_running_tasks ws.handle_stimulus( - FreeKeysEvent(keys=["x"], stimulus_id="cancel"), ComputeTaskEvent.dummy( key="y", who_has={"x": ["127.0.0.1:1235"]}, - nbytes={"x": 8}, stimulus_id="compute-y", ), ) - task = ws.tasks["x"] - assert task - assert task.state == "resumed" - assert task in ws.all_running_tasks + assert ts.state == "resumed" + assert ts in ws.all_running_tasks @pytest.mark.xfail(reason="distributed#6565") -def test_successful_running_task_not_in_all_running_tasks(ws_with_running_task): +def test_successful_task_not_in_all_running_tasks(ws_with_running_task): ws = ws_with_running_task + ts = ws.tasks["x"] + assert ts in ws.all_running_tasks + ws.handle_stimulus( ExecuteSuccessEvent( key="x", @@ -1082,15 +1070,16 @@ def test_successful_running_task_not_in_all_running_tasks(ws_with_running_task): stimulus_id="success", ) ) - task = ws.tasks["x"] - assert task - assert task.state == "memory" - assert task not in ws.all_running_tasks + assert ts.state == "memory" + assert ts not in ws.all_running_tasks @pytest.mark.xfail(reason="distributed#6565") -def test_erroneous_running_task_not_in_all_running_tasks(ws_with_running_task): +def test_erred_task_not_in_all_running_tasks(ws_with_running_task): ws = ws_with_running_task + ts = ws.tasks["x"] + assert ts in ws.all_running_tasks + ws.handle_stimulus( ExecuteFailureEvent( @@ -1104,10 +1093,8 @@ def test_erroneous_running_task_not_in_all_running_tasks(ws_with_running_task): stimulus_id="error", ) ) - task = ws.tasks["x"] - assert task - assert task.state == "error" - assert task not in ws.all_running_tasks + assert ts.state == "error" + assert ts not in ws.all_running_tasks @pytest.mark.xfail(reason="distributed#6565") @@ -1119,7 +1106,6 @@ def test_successful_resumed_running_task_not_in_all_running_tasks(ws_with_runnin ComputeTaskEvent.dummy( key="y", who_has={"x": ["127.0.0.1:1235"]}, - nbytes={"x": 8}, stimulus_id="compute-y", ), ExecuteSuccessEvent( @@ -1132,9 +1118,9 @@ def test_successful_resumed_running_task_not_in_all_running_tasks(ws_with_runnin stimulus_id="success", ), ) - task = ws.tasks["x"] - assert task.state == "memory" - assert task not in ws.all_running_tasks + ts = ws.tasks["x"] + assert ts.state == "memory" + assert ts not in ws.all_running_tasks @pytest.mark.xfail(reason="distributed#6565") @@ -1146,7 +1132,6 @@ def test_erroneous_resumed_running_task_not_in_all_running_tasks(ws_with_running ComputeTaskEvent.dummy( key="y", who_has={"x": ["127.0.0.1:1235"]}, - nbytes={"x": 8}, stimulus_id="compute-y", ), ExecuteFailureEvent( @@ -1160,9 +1145,9 @@ def test_erroneous_resumed_running_task_not_in_all_running_tasks(ws_with_running stimulus_id="error", ), ) - task = ws.tasks["x"] - assert task.state == "error" - assert task not in ws.all_running_tasks + ts = ws.tasks["x"] + assert ts.state == "error" + assert ts not in ws.all_running_tasks @gen_cluster() diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index e78111635e5..19573795455 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3120,13 +3120,7 @@ def validate_state(self) -> None: for ts_wait in ts.waiting_for_data: assert ts_wait.key in self.tasks assert ( - ts_wait.state in READY | {"executing", "flight", "fetch", "missing"} - or ts_wait in self.missing_dep_flight - or ts_wait.who_has.issubset(self.in_flight_workers) - or ( - ts_wait.state == "resumed" - and ts_wait._previous in {"executing", "long-running"} - ) + ts_wait.state in READY | {"executing", "long-running", "resumed", "flight", "fetch", "missing"} ), (ts, ts_wait, self.story(ts), self.story(ts_wait)) # FIXME https://github.com/dask/distributed/issues/6319 # assert self.waiting_for_data_count == waiting_for_data_count From 3bacc5fa2b5d3a7aa9873bd1631b11e49b85fe45 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 8 Jul 2022 09:52:38 +0200 Subject: [PATCH 3/4] Clean up tests --- .../tests/test_worker_state_machine.py | 92 +++++-------------- distributed/worker_state_machine.py | 11 ++- 2 files changed, 33 insertions(+), 70 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 8e56dbdef80..eb522f842b2 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1054,51 +1054,41 @@ def test_running_task_in_all_running_tasks(ws_with_running_task): @pytest.mark.xfail(reason="distributed#6565") -def test_successful_task_not_in_all_running_tasks(ws_with_running_task): +@pytest.mark.parametrize( + "done_ev_cls,done_status", + [ + (ExecuteSuccessEvent, "memory"), + (ExecuteFailureEvent, "error"), + ], +) +def test_done_task_not_in_all_running_tasks( + ws_with_running_task, done_ev_cls, done_status +): ws = ws_with_running_task ts = ws.tasks["x"] assert ts in ws.all_running_tasks ws.handle_stimulus( - ExecuteSuccessEvent( + done_ev_cls.dummy( key="x", - value=None, - start=0.0, - stop=1.0, - nbytes=8, - type=None, stimulus_id="success", ) ) - assert ts.state == "memory" - assert ts not in ws.all_running_tasks - - -@pytest.mark.xfail(reason="distributed#6565") -def test_erred_task_not_in_all_running_tasks(ws_with_running_task): - ws = ws_with_running_task - ts = ws.tasks["x"] - assert ts in ws.all_running_tasks - - - ws.handle_stimulus( - ExecuteFailureEvent( - key="x", - start=0.0, - stop=0.0, - exception=Serialize(RuntimeError()), - traceback="failed", - exception_text="exc text", - traceback_text="trc text", - stimulus_id="error", - ) - ) - assert ts.state == "error" + assert ts.state == done_status assert ts not in ws.all_running_tasks -@pytest.mark.xfail(reason="distributed#6565") -def test_successful_resumed_running_task_not_in_all_running_tasks(ws_with_running_task): +# @pytest.mark.xfail(reason="distributed#6565") +@pytest.mark.parametrize( + "done_ev_cls,done_status", + [ + (ExecuteSuccessEvent, "memory"), + (ExecuteFailureEvent, "error"), + ], +) +def test_done_resumed_running_task_not_in_all_running_tasks( + ws_with_running_task, done_ev_cls, done_status +): ws = ws_with_running_task ws.handle_stimulus( @@ -1108,45 +1098,13 @@ def test_successful_resumed_running_task_not_in_all_running_tasks(ws_with_runnin who_has={"x": ["127.0.0.1:1235"]}, stimulus_id="compute-y", ), - ExecuteSuccessEvent( + done_ev_cls( key="x", - value=None, - start=0.0, - stop=1.0, - nbytes=8, - type=None, stimulus_id="success", ), ) ts = ws.tasks["x"] - assert ts.state == "memory" - assert ts not in ws.all_running_tasks - - -@pytest.mark.xfail(reason="distributed#6565") -def test_erroneous_resumed_running_task_not_in_all_running_tasks(ws_with_running_task): - ws = ws_with_running_task - - ws.handle_stimulus( - FreeKeysEvent(keys=["x"], stimulus_id="cancel"), - ComputeTaskEvent.dummy( - key="y", - who_has={"x": ["127.0.0.1:1235"]}, - stimulus_id="compute-y", - ), - ExecuteFailureEvent( - key="x", - start=0.0, - stop=0.0, - exception=Serialize(RuntimeError()), - traceback="failed", - exception_text="exc text", - traceback_text="trc text", - stimulus_id="error", - ), - ) - ts = ws.tasks["x"] - assert ts.state == "error" + assert ts.state == done_status assert ts not in ws.all_running_tasks diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 19573795455..a35e7b05aae 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3119,9 +3119,14 @@ def validate_state(self) -> None: waiting_for_data_count += 1 for ts_wait in ts.waiting_for_data: assert ts_wait.key in self.tasks - assert ( - ts_wait.state in READY | {"executing", "long-running", "resumed", "flight", "fetch", "missing"} - ), (ts, ts_wait, self.story(ts), self.story(ts_wait)) + assert ts_wait.state in READY | { + "executing", + "long-running", + "resumed", + "flight", + "fetch", + "missing", + }, (ts, ts_wait, self.story(ts), self.story(ts_wait)) # FIXME https://github.com/dask/distributed/issues/6319 # assert self.waiting_for_data_count == waiting_for_data_count for worker, keys in self.has_what.items(): From 44e54dcac32e9016bb008dbafac61eaa59befd85 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Jul 2022 09:22:52 +0100 Subject: [PATCH 4/4] Code review --- .../tests/test_worker_state_machine.py | 60 +++++++------------ distributed/worker_state_machine.py | 22 +++---- 2 files changed, 32 insertions(+), 50 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index eb522f842b2..fab4a4cf3fd 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1033,33 +1033,34 @@ def test_gather_priority(ws): ] +@gen_cluster() +async def test_clean_log(s, a, b): + """Test that brand new workers start with a clean log""" + assert not a.state.log + assert not a.state.stimulus_log + + def test_running_task_in_all_running_tasks(ws_with_running_task): ws = ws_with_running_task + ws2 = "127.0.0.1:2" ts = ws.tasks["x"] assert ts in ws.all_running_tasks - ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="cancel")) + ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1")) assert ts.state == "cancelled" assert ts in ws.all_running_tasks ws.handle_stimulus( - ComputeTaskEvent.dummy( - key="y", - who_has={"x": ["127.0.0.1:1235"]}, - stimulus_id="compute-y", - ), + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2") ) assert ts.state == "resumed" assert ts in ws.all_running_tasks -@pytest.mark.xfail(reason="distributed#6565") +@pytest.mark.xfail(reason="distributed#6565, distributed#6692") @pytest.mark.parametrize( "done_ev_cls,done_status", - [ - (ExecuteSuccessEvent, "memory"), - (ExecuteFailureEvent, "error"), - ], + [(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "error")], ) def test_done_task_not_in_all_running_tasks( ws_with_running_task, done_ev_cls, done_status @@ -1068,48 +1069,27 @@ def test_done_task_not_in_all_running_tasks( ts = ws.tasks["x"] assert ts in ws.all_running_tasks - ws.handle_stimulus( - done_ev_cls.dummy( - key="x", - stimulus_id="success", - ) - ) + ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s1")) assert ts.state == done_status assert ts not in ws.all_running_tasks -# @pytest.mark.xfail(reason="distributed#6565") +@pytest.mark.xfail(reason="distributed#6565, distributed#6689, distributed#6692") @pytest.mark.parametrize( "done_ev_cls,done_status", - [ - (ExecuteSuccessEvent, "memory"), - (ExecuteFailureEvent, "error"), - ], + [(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "error")], ) -def test_done_resumed_running_task_not_in_all_running_tasks( +def test_done_resumed_task_not_in_all_running_tasks( ws_with_running_task, done_ev_cls, done_status ): ws = ws_with_running_task + ws2 = "127.0.0.1:2" ws.handle_stimulus( - FreeKeysEvent(keys=["x"], stimulus_id="cancel"), - ComputeTaskEvent.dummy( - key="y", - who_has={"x": ["127.0.0.1:1235"]}, - stimulus_id="compute-y", - ), - done_ev_cls( - key="x", - stimulus_id="success", - ), + FreeKeysEvent(keys=["x"], stimulus_id="s1"), + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2"), + done_ev_cls.dummy("x", stimulus_id="s3"), ) ts = ws.tasks["x"] assert ts.state == done_status assert ts not in ws.all_running_tasks - - -@gen_cluster() -async def test_clean_log(s, a, b): - """Test that brand new workers start with a clean log""" - assert not a.state.log - assert not a.state.stimulus_log diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index a35e7b05aae..b31db943c7a 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1203,6 +1203,7 @@ def handle_stimulus(self, *stims: StateMachineEvent) -> Instructions: @property def executing_count(self) -> int: """Count of tasks currently executing on this worker. + Does not include long running (a.k.a. seceded) and cancelled tasks. See also -------- @@ -1212,6 +1213,17 @@ def executing_count(self) -> int: """ return len(self.executing) + @property + def all_running_tasks(self) -> set[TaskState]: + """All tasks that are currently occupying a thread. + These are: + + - ``ts.status in ("executing", "long-running", "cancelled")`` + - ``ts.status == "resumed" and ts._previous in ("executing", "long-running")`` + """ + # Note: cancelled and resumed tasks are still in either of these sets + return self.executing | {self.tasks[key] for key in self.long_running} + @property def in_flight_tasks_count(self) -> int: """Count of tasks currently being replicated from other workers to this one. @@ -3160,16 +3172,6 @@ def validate_state(self) -> None: for tss in self.data_needed.values(): assert len({ts.key for ts in tss}) == len(tss) - @property - def all_running_tasks(self) -> set[TaskState]: - """All tasks that are currently running. - These are: - - ``ts.status`` == ``executing``, ``long-running``, or ``cancelled`` - - ``ts.status` == ``resumed`` and ``ts._previous`` == ``executing`` or ``long-running`` - """ - # Note: tasks in "cancelled" and "resumed" state are still in either of these sets - return self.executing | {self.tasks[key] for key in self.long_running} - class BaseWorker(abc.ABC): """Wrapper around the :class:`WorkerState` that implements instructions handling.