From 20facf783a986bb78a35df04c521e812ec349247 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 16 Jun 2023 20:19:08 +0200 Subject: [PATCH 01/13] Sign task-erred --- distributed/scheduler.py | 8 ++++++++ distributed/worker_state_machine.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b01d3f06ad0..fd4f9ed60c9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4876,6 +4876,7 @@ def stimulus_task_erred( exception=None, stimulus_id=None, traceback=None, + run_id=None, **kwargs, ): """Mark that a task has erred on a particular worker""" @@ -4885,6 +4886,13 @@ def stimulus_task_erred( if ts is None or ts.state != "processing": return {}, {}, {} + if ( + ts.run_id != run_id + and ts.processing_on + and ts.processing_on.address == worker + ): + return self._transition(key, "waiting", stimulus_id) + if ts.retries > 0: ts.retries -= 1 return self._transition(key, "waiting", stimulus_id) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 504eddfe40e..1fe99468d4a 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -482,6 +482,7 @@ class TaskErredMsg(SendMessageToScheduler): op = "task-erred" key: str + run_id: int exception: Serialize traceback: Serialize | None exception_text: str @@ -502,6 +503,7 @@ def from_task( assert ts.exception return TaskErredMsg( key=ts.key, + run_id=ts.run_id, exception=ts.exception, traceback=ts.traceback, exception_text=ts.exception_text, From 286e4b643630f8d933bd024a492955507b1acdff Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 14:47:51 +0200 Subject: [PATCH 02/13] Better signing --- distributed/scheduler.py | 10 ++++------ distributed/tests/test_cancelled_state.py | 13 +++++++++++-- distributed/worker.py | 3 +++ distributed/worker_state_machine.py | 18 +++++++++++++++--- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fd4f9ed60c9..3a72976c47b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4886,12 +4886,10 @@ def stimulus_task_erred( if ts is None or ts.state != "processing": return {}, {}, {} - if ( - ts.run_id != run_id - and ts.processing_on - and ts.processing_on.address == worker - ): - return self._transition(key, "waiting", stimulus_id) + if ts.run_id != run_id: + if ts.processing_on and ts.processing_on.address == worker: + return self._transition(key, "released", stimulus_id) + return {}, {}, {} if ts.retries > 0: ts.retries -= 1 diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 7f95ab36ddc..87eaec7dc5f 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -569,8 +569,7 @@ async def release_all_futures(): ) elif wait_for_processing and raise_error: - with pytest.raises(RuntimeError, match="test error"): - await f3 + assert await f4 == 4 + 2 assert_story( b.state.story(f3.key), @@ -581,6 +580,16 @@ async def release_all_futures(): (f3.key, "resumed", "released", "cancelled", {}), (f3.key, "cancelled", "waiting", "executing", {}), (f3.key, "executing", "error", "error", {}), + ( + f3.key, + "error", + "released", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), + (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "memory", "memory", {}) # FIXME: (distributed#7489) ], ) diff --git a/distributed/worker.py b/distributed/worker.py index 9e7bce3bd5d..e0e0ca441ee 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2232,6 +2232,7 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: return ExecuteFailureEvent.from_exception( exc, key=key, + run_id=run_id, stimulus_id=f"run-spec-deserialize-failed-{time()}", ) @@ -2356,6 +2357,7 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: return ExecuteFailureEvent.from_exception( result, key=key, + run_id=run_id, start=result["start"], stop=result["stop"], stimulus_id=f"task-erred-{time()}", @@ -2366,6 +2368,7 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent: return ExecuteFailureEvent.from_exception( exc, key=key, + run_id=run_id, stimulus_id=f"execute-unknown-error-{time()}", ) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 1fe99468d4a..89468b547a1 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -498,12 +498,12 @@ def to_dict(self) -> dict[str, Any]: @staticmethod def from_task( - ts: TaskState, stimulus_id: str, thread: int | None = None + ts: TaskState, run_id: int, stimulus_id: str, thread: int | None = None ) -> TaskErredMsg: assert ts.exception return TaskErredMsg( key=ts.key, - run_id=ts.run_id, + run_id=run_id, exception=ts.exception, traceback=ts.traceback, exception_text=ts.exception_text, @@ -905,6 +905,7 @@ def dummy( @dataclass class ExecuteFailureEvent(ExecuteDoneEvent): + run_id: int # FIXME: Utilize the run ID in all ExecuteDoneEvents start: float | None stop: float | None exception: Serialize @@ -923,6 +924,7 @@ def from_exception( err_or_msg: BaseException | ErrorMessage, *, key: str, + run_id: int, start: float | None = None, stop: float | None = None, stimulus_id: str, @@ -934,6 +936,7 @@ def from_exception( return cls( key=key, + run_id=run_id, start=start, stop=stop, exception=msg["exception"], @@ -947,6 +950,7 @@ def from_exception( def dummy( key: str, *, + run_id: int = 1, stimulus_id: str, ) -> ExecuteFailureEvent: """Build a dummy event, with most attributes set to a reasonable default. @@ -954,6 +958,7 @@ def dummy( """ return ExecuteFailureEvent( key=key, + run_id=run_id, start=None, stop=None, exception=Serialize(None), @@ -2023,6 +2028,7 @@ def _transition_waiting_ready( def _transition_generic_error( self, ts: TaskState, + run_id: int, exception: Serialize, traceback: Serialize | None, exception_text: str, @@ -2037,6 +2043,7 @@ def _transition_generic_error( ts.state = "error" smsg = TaskErredMsg.from_task( ts, + run_id=run_id, stimulus_id=stimulus_id, thread=self.threads.get(ts.key), ) @@ -2046,6 +2053,7 @@ def _transition_generic_error( def _transition_resumed_error( self, ts: TaskState, + run_id: int, exception: Serialize, traceback: Serialize | None, exception_text: str, @@ -2861,7 +2869,9 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs: ) ) elif ts.state == "error": - instructions.append(TaskErredMsg.from_task(ts, stimulus_id=ev.stimulus_id)) + instructions.append( + TaskErredMsg.from_task(ts, run_id=ev.run_id, stimulus_id=ev.stimulus_id) + ) elif ts.state in { "released", "fetch", @@ -3040,6 +3050,7 @@ def _handle_gather_dep_failure(self, ev: GatherDepFailureEvent) -> RecsInstrs: recommendations: Recs = { ts: ( "error", + ts.run_id, ev.exception, ev.traceback, ev.exception_text, @@ -3177,6 +3188,7 @@ def _handle_execute_failure(self, ev: ExecuteFailureEvent) -> RecsInstrs: ) recs[ts] = ( "error", + ev.run_id, ev.exception, ev.traceback, ev.exception_text, From b207510b605d4e21338f9604ed02271a41f5ffd8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 14:50:25 +0200 Subject: [PATCH 03/13] Comment --- distributed/tests/test_cancelled_state.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 87eaec7dc5f..9ec8ee52c81 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -589,8 +589,7 @@ async def release_all_futures(): ), (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), (f3.key, "ready", "executing", "executing", {}), - (f3.key, "executing", "memory", "memory", {}) - # FIXME: (distributed#7489) + (f3.key, "executing", "memory", "memory", {}), ], ) else: From baad8df99610d0f3912e1fab939490dcbabe359d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 15:25:31 +0200 Subject: [PATCH 04/13] Fix tests --- distributed/tests/test_worker_state_machine.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index b2817363001..1c87d1d39f4 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -531,6 +531,7 @@ def test_executefailure_to_dict(): ev = ExecuteFailureEvent( stimulus_id="test", key="x", + run_id=1, start=123.4, stop=456.7, exception=Serialize(ValueError("foo")), @@ -546,6 +547,7 @@ def test_executefailure_to_dict(): "stimulus_id": "test", "handled": 11.22, "key": "x", + "run_id": 1, "start": 123.4, "stop": 456.7, "exception": "", @@ -571,6 +573,7 @@ def test_executefailure_dummy(): ev = ExecuteFailureEvent.dummy("x", stimulus_id="s") assert ev == ExecuteFailureEvent( key="x", + run_id=1, start=None, stop=None, exception=Serialize(None), From c8bf5833cd91e85c5f7e5e85bb944b716c0a163e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 15:42:33 +0200 Subject: [PATCH 05/13] Fix transition --- distributed/worker_state_machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 89468b547a1..1d475d1f332 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2028,11 +2028,11 @@ def _transition_waiting_ready( def _transition_generic_error( self, ts: TaskState, - run_id: int, exception: Serialize, traceback: Serialize | None, exception_text: str, traceback_text: str, + run_id: int, *, stimulus_id: str, ) -> RecsInstrs: @@ -2448,7 +2448,7 @@ def _transition_to_memory( # Third-party MutableMappings (dask-cuda etc.) may have other use cases # for this. msg = error_message(e) - return {ts: tuple(msg.values())}, [] + return {ts: tuple(msg.values()) + (run_id,)}, [] stop = time() if stop - start > 0.005: From 952848c007cf5fa23a03a9063c55979a6ff39065 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 16:03:11 +0200 Subject: [PATCH 06/13] Parametrize test --- distributed/tests/test_cancelled_state.py | 65 +++++++++++++++-------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 9ec8ee52c81..120f7e0b550 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -596,8 +596,9 @@ async def release_all_futures(): assert False, "unreachable" +@pytest.mark.parametrize("raise_error", [True, False]) @gen_cluster(client=True) -async def test_cancelled_handle_compute(c, s, a, b): +async def test_cancelled_handle_compute(c, s, a, b, raise_error): """ Given the history of a task executing -> cancelled @@ -619,6 +620,8 @@ async def test_cancelled_handle_compute(c, s, a, b): def block(x, lock, enter_event, exit_event): enter_event.set() with lock: + if raise_error: + raise RuntimeError("test error") return x + 1 f1 = c.submit(inc, 1, key="f1", workers=[a.address]) @@ -658,26 +661,46 @@ async def release_all_futures(): assert await f4 == 4 + 2 - story = b.state.story(f3.key) - assert_story( - b.state.story(f3.key), - expect=[ - (f3.key, "ready", "executing", "executing", {}), - (f3.key, "executing", "released", "cancelled", {}), - (f3.key, "cancelled", "waiting", "executing", {}), - (f3.key, "executing", "memory", "memory", {}), - ( - f3.key, - "memory", - "released", - "released", - {f2.key: "released", f3.key: "forgotten"}, - ), - (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), - (f3.key, "ready", "executing", "executing", {}), - (f3.key, "executing", "memory", "memory", {}), - ], - ) + if raise_error: + assert_story( + b.state.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "released", "cancelled", {}), + (f3.key, "cancelled", "waiting", "executing", {}), + (f3.key, "executing", "error", "error", {}), + ( + f3.key, + "error", + "released", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), + (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "memory", "memory", {}), + ], + ) + else: + assert_story( + b.state.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "released", "cancelled", {}), + (f3.key, "cancelled", "waiting", "executing", {}), + (f3.key, "executing", "memory", "memory", {}), + ( + f3.key, + "memory", + "released", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), + (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "memory", "memory", {}), + ], + ) @pytest.mark.parametrize("intermediate_state", ["resumed", "cancelled"]) From 22365b0ec742ffc490fd575b310f0bd4eac60a76 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 16:05:34 +0200 Subject: [PATCH 07/13] Adjust flaky case --- distributed/tests/test_cancelled_state.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 120f7e0b550..c44842581dd 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -530,13 +530,6 @@ async def release_all_futures(): (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), (f3.key, "resumed", "released", "cancelled", {}), - ( - f3.key, - "cancelled", - "error", - "released", - {f2.key: "released", f3.key: "forgotten"}, - ), (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "memory", "memory", {}), From 041d0531fdde8bf087c292f10c865a30245d2e5e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 16:52:33 +0200 Subject: [PATCH 08/13] Adjust test --- distributed/tests/test_cancelled_state.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index c44842581dd..8f0c1d1d1c1 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -488,6 +488,9 @@ async def release_all_futures(): await lock_compute.release() await exit_compute.wait() + while f3.key in b.state.tasks: + await asyncio.sleep(0.01) + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address]) f3 = c.submit(inc, f2, key="f3", workers=[b.address]) @@ -530,6 +533,13 @@ async def release_all_futures(): (f3.key, "executing", "released", "cancelled", {}), (f3.key, "cancelled", "fetch", "resumed", {}), (f3.key, "resumed", "released", "cancelled", {}), + ( + f3.key, + "cancelled", + "error", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), (f3.key, "ready", "executing", "executing", {}), (f3.key, "executing", "memory", "memory", {}), From 1669a3f8086c1c2cfb8c7d1ae74b0a1186b98de8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 17:04:21 +0200 Subject: [PATCH 09/13] Docs --- distributed/tests/test_cancelled_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 8f0c1d1d1c1..cfa7fc18013 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -821,7 +821,7 @@ def test_workerstate_executing_failure_to_fetch(ws_with_running_task): - executing -> long-running -> cancelled -> resumed(fetch) The task execution later terminates with a failure. - This is an edge case interaction between work stealing and a task that does not + This is an edge case interaction involving task cancellation and a task that does not deterministically succeed or fail when run multiple times or on different workers. Test that the task is fetched from the other worker. This is to avoid having to deal From 81d078a401a672e8aca2975f03b477e25da2b50f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 19 Jun 2023 18:53:29 +0200 Subject: [PATCH 10/13] Fix transitions --- distributed/worker_state_machine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 1d475d1f332..d71a8e0258f 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2053,11 +2053,11 @@ def _transition_generic_error( def _transition_resumed_error( self, ts: TaskState, - run_id: int, exception: Serialize, traceback: Serialize | None, exception_text: str, traceback_text: str, + run_id: int, *, stimulus_id: str, ) -> RecsInstrs: @@ -3050,11 +3050,11 @@ def _handle_gather_dep_failure(self, ev: GatherDepFailureEvent) -> RecsInstrs: recommendations: Recs = { ts: ( "error", - ts.run_id, ev.exception, ev.traceback, ev.exception_text, ev.traceback_text, + ts.run_id, ) for ts in self._gather_dep_done_common(ev) } @@ -3188,11 +3188,11 @@ def _handle_execute_failure(self, ev: ExecuteFailureEvent) -> RecsInstrs: ) recs[ts] = ( "error", - ev.run_id, ev.exception, ev.traceback, ev.exception_text, ev.traceback_text, + ev.run_id, ) return recs, instr From 320824dccab341a72570986953681a37f67cc006 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 21 Jun 2023 12:51:15 +0200 Subject: [PATCH 11/13] Add test case --- distributed/tests/test_cancelled_state.py | 112 +++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index cfa7fc18013..a1de34d6c83 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -606,7 +606,8 @@ async def test_cancelled_handle_compute(c, s, a, b, raise_error): Given the history of a task executing -> cancelled - A handle_compute should properly restore executing. + A handle_compute should cause the result of the cancelled task to be rejected + by the scheduler and the task to be re-run. See Also -------- @@ -706,6 +707,115 @@ async def release_all_futures(): ) +@gen_cluster(client=True) +async def test_cancelled_task_error_rejected(c, s, a, b): + """ + Given the history of a task + executing -> cancelled + + An error in the cancelled task is rejected by the scheduler and superseded + by a more recent run on another worker. + + """ + # This test is heavily using set_restrictions to simulate certain scheduler + # decisions of placing keys + + lock_erring = Lock() + enter_compute_erring = Event() + exit_compute_erring = Event() + lock_successful = Lock() + enter_compute_successful = Event() + exit_compute_successful = Event() + + await lock_erring.acquire() + await lock_successful.acquire() + + def block(x, lock, enter_event, exit_event, raise_error): + enter_event.set() + try: + with lock: + if raise_error: + raise RuntimeError("test_error") + return x + 1 + finally: + exit_event.set() + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit( + block, + f2, + lock=lock_erring, + enter_event=enter_compute_erring, + exit_event=exit_compute_erring, + raise_error=True, + key="f3", + workers=[b.address], + ) + + f4 = c.submit(sum, [f1, f3], key="f4", workers=[b.address]) + + await enter_compute_erring.wait() + + async def release_all_futures(): + futs = [f1, f2, f3, f4] + for fut in futs: + fut.release() + + while any(fut.key in s.tasks for fut in futs): + await asyncio.sleep(0.05) + + await release_all_futures() + await wait_for_state(f3.key, "cancelled", b) + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit( + block, + f2, + lock=lock_successful, + enter_event=enter_compute_successful, + exit_event=exit_compute_successful, + raise_error=False, + key="f3", + workers=[a.address], + ) + f4 = c.submit(sum, [f1, f3], key="f4", workers=[b.address]) + + await wait_for_state(f3.key, "processing", s) + await enter_compute_successful.wait() + + await lock_erring.release() + while f3.key in b.state.tasks: + await asyncio.sleep(0.05) + await lock_successful.release() + assert await f4 == 4 + 2 + + assert_story( + b.state.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "released", "cancelled", {}), + ( + f3.key, + "cancelled", + "error", + "released", + {f2.key: "released", f3.key: "forgotten"}, + ), + (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), + ], + ) + + assert_story( + a.state.story(f3.key), + expect=[ + (f3.key, "ready", "executing", "executing", {}), + (f3.key, "executing", "memory", "memory", {}), + ], + ) + + @pytest.mark.parametrize("intermediate_state", ["resumed", "cancelled"]) @pytest.mark.parametrize("close_worker", [False, True]) @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "500ms"}) From 554322ee45a54a0a1263d48d7ff06b3a02b048a7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 21 Jun 2023 13:27:52 +0200 Subject: [PATCH 12/13] Improve test --- distributed/tests/test_cancelled_state.py | 49 ++++++++++++----------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index a1de34d6c83..55c33f54928 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -14,6 +14,7 @@ _LockedCommPool, assert_story, async_poll_for, + freeze_batched_send, gen_cluster, inc, lock_inc, @@ -765,29 +766,29 @@ async def release_all_futures(): while any(fut.key in s.tasks for fut in futs): await asyncio.sleep(0.05) - await release_all_futures() - await wait_for_state(f3.key, "cancelled", b) + with freeze_batched_send(s.stream_comms[b.address]): + await release_all_futures() + + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) + f2 = c.submit(inc, f1, key="f2", workers=[a.address]) + f3 = c.submit( + block, + f2, + lock=lock_successful, + enter_event=enter_compute_successful, + exit_event=exit_compute_successful, + raise_error=False, + key="f3", + workers=[a.address], + ) + f4 = c.submit(sum, [f1, f3], key="f4", workers=[b.address]) - f1 = c.submit(inc, 1, key="f1", workers=[a.address]) - f2 = c.submit(inc, f1, key="f2", workers=[a.address]) - f3 = c.submit( - block, - f2, - lock=lock_successful, - enter_event=enter_compute_successful, - exit_event=exit_compute_successful, - raise_error=False, - key="f3", - workers=[a.address], - ) - f4 = c.submit(sum, [f1, f3], key="f4", workers=[b.address]) + await wait_for_state(f3.key, "processing", s) + await enter_compute_successful.wait() - await wait_for_state(f3.key, "processing", s) - await enter_compute_successful.wait() + await lock_erring.release() + await wait_for_state(f3.key, "error", b) - await lock_erring.release() - while f3.key in b.state.tasks: - await asyncio.sleep(0.05) await lock_successful.release() assert await f4 == 4 + 2 @@ -795,15 +796,15 @@ async def release_all_futures(): b.state.story(f3.key), expect=[ (f3.key, "ready", "executing", "executing", {}), - (f3.key, "executing", "released", "cancelled", {}), + (f3.key, "executing", "error", "error", {}), ( f3.key, - "cancelled", "error", "released", - {f2.key: "released", f3.key: "forgotten"}, + "released", + {f3.key: "forgotten"}, ), - (f3.key, "released", "forgotten", "forgotten", {f2.key: "forgotten"}), + (f3.key, "released", "forgotten", "forgotten", {}), ], ) From 23259b19c8d1d732a7d78baebff6b781526ff9a0 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 4 Jul 2023 14:23:18 +0200 Subject: [PATCH 13/13] Update distributed/tests/test_cancelled_state.py Co-authored-by: crusaderky --- distributed/tests/test_cancelled_state.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 55c33f54928..1609972fa8b 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -489,8 +489,7 @@ async def release_all_futures(): await lock_compute.release() await exit_compute.wait() - while f3.key in b.state.tasks: - await asyncio.sleep(0.01) + await async_poll_for(lambda: f3.key not in b.state.tasks, timeout=5) f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address])