From 2595b17dfff6f82a4be727e5c80e7eeea9908e09 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 5 Jul 2023 14:02:35 +0200 Subject: [PATCH 1/2] Rerun --- distributed/tests/test_worker_state_machine.py | 14 ++++++++++++++ distributed/worker_state_machine.py | 5 +---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 1c87d1d39f4..16987fb41a0 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1334,6 +1334,20 @@ def test_gather_dep_failure(ws): ws.validate = False +def test_compute_erred_task(ws): + instructions = ws.handle_stimulus( + ComputeTaskEvent.dummy("x", run_id=1, stimulus_id="s1"), + ExecuteFailureEvent.dummy("x", run_id=1, stimulus_id="s2"), + ComputeTaskEvent.dummy("x", run_id=2, stimulus_id="s3"), + ) + assert instructions == [ + Execute(key="x", stimulus_id="s1"), + TaskErredMsg.match(key="x", run_id=1, stimulus_id="s2"), + Execute(key="x", stimulus_id="s3"), + ] + assert ws.tasks["x"].state == "executing" + + def test_transfer_incoming_metrics(ws): assert ws.transfer_incoming_bytes == 0 assert ws.transfer_incoming_count == 0 diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index db32765d15e..985db35d24b 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2868,10 +2868,6 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs: ts, run_id=ev.run_id, stimulus_id=ev.stimulus_id ) ) - elif ts.state == "error": - instructions.append( - TaskErredMsg.from_task(ts, run_id=ev.run_id, stimulus_id=ev.stimulus_id) - ) elif ts.state in { "released", "fetch", @@ -2879,6 +2875,7 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs: "missing", "cancelled", "resumed", + "error", }: recommendations[ts] = "waiting" From 7d9c74816e81f8d48e4815b4fac90e7e74843a04 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 5 Jul 2023 14:05:29 +0200 Subject: [PATCH 2/2] Rename --- distributed/tests/test_worker_state_machine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 16987fb41a0..b06ab6548e7 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1334,7 +1334,7 @@ def test_gather_dep_failure(ws): ws.validate = False -def test_compute_erred_task(ws): +def test_recompute_erred_task(ws): instructions = ws.handle_stimulus( ComputeTaskEvent.dummy("x", run_id=1, stimulus_id="s1"), ExecuteFailureEvent.dummy("x", run_id=1, stimulus_id="s2"),