From 6a15062cfbf7d9bc69da1f90c574691c48c9440a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Jul 2022 18:02:37 +0200 Subject: [PATCH 1/4] Add dummy methods for and --- distributed/tests/test_steal.py | 7 +-- .../tests/test_worker_state_machine.py | 27 ++++++++++ distributed/worker_state_machine.py | 54 ++++++++++++++++++- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 781a6a8ee4f..eb6b873d95b 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1331,13 +1331,8 @@ def test_steal_worker_state(ws_with_running_task): assert ws.tasks["x"].state == "cancelled" instructions = ws.handle_stimulus( - ExecuteSuccessEvent( + ExecuteSuccessEvent.dummy( key="x", - value=None, - start=0.0, - stop=1.0, - nbytes=8, - type=None, stimulus_id="s2", ), ) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 8df1a466abd..3a00d64d6e7 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -391,6 +391,19 @@ def test_executesuccess_to_dict(): assert ev3.type is None +def test_executesuccess_dummy(): + ev = ExecuteSuccessEvent.dummy(key="x", stimulus_id="s") + assert ev == ExecuteSuccessEvent( + key="x", + value=None, + start=0.0, + stop=1.0, + nbytes=8, + type=None, + stimulus_id="s", + ) + + def test_executefailure_to_dict(): ev = ExecuteFailureEvent( stimulus_id="test", @@ -431,6 +444,20 @@ def test_executefailure_to_dict(): assert ev3.traceback_text == "tb text" +def test_executefailure_dummy(): + ev = ExecuteFailureEvent.dummy(key="x", stimulus_id="s") + assert ev == ExecuteFailureEvent( + key="x", + start=None, + stop=None, + exception=Serialize(None), + traceback=None, + exception_text="", + traceback_text="", + stimulus_id="s", + ) + + @gen_cluster(client=True) async def test_fetch_to_compute(c, s, a, b): with freeze_data_fetching(b): diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 647f3cd331a..357a2440d98 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -70,6 +70,8 @@ else: TaskStateState = str +Type = type + # TaskState.state subsets PROCESSING: set[TaskStateState] = { "waiting", @@ -733,7 +735,7 @@ class ExecuteSuccessEvent(StateMachineEvent): start: float stop: float nbytes: int - type: type | None + type: Type | None __slots__ = tuple(__annotations__) def to_loggable(self, *, handled: float) -> StateMachineEvent: @@ -746,6 +748,30 @@ def _after_from_dict(self) -> None: self.value = None self.type = None + @staticmethod + def dummy( + *, + key: str, + value: object = None, + start: float = 0.0, + stop: float = 1.0, + nbytes: int = 8, + type_: Type | None = None, + stimulus_id: str, + ) -> ExecuteSuccessEvent: + """Build a dummy event, with most attributes set to a reasonable default. + This is a convenience method to be used in unit testing only. + """ + return ExecuteSuccessEvent( + key=key, + value=value, + start=start, + stop=stop, + nbytes=nbytes, + type=type_, + stimulus_id=stimulus_id, + ) + @dataclass class ExecuteFailureEvent(StateMachineEvent): @@ -788,6 +814,32 @@ def from_exception( stimulus_id=stimulus_id, ) + @staticmethod + def dummy( + *, + key: str, + start: float | None = None, + stop: float | None = None, + exception: Serialize | None = None, + traceback: Serialize | None = None, + exception_text: str = "", + traceback_text: str = "", + stimulus_id: str, + ) -> ExecuteFailureEvent: + """Build a dummy event, with most attributes set to a reasonable default. + This is a convenience method to be used in unit testing only. + """ + return ExecuteFailureEvent( + key=key, + start=start, + stop=stop, + exception=exception if exception is not None else Serialize(None), + traceback=traceback, + exception_text=exception_text, + traceback_text=traceback_text, + stimulus_id=stimulus_id, + ) + @dataclass class CancelComputeEvent(StateMachineEvent): From 1ea8696ad0179f6f4b55bde5612e267cd403390f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 7 Jul 2022 20:08:41 +0200 Subject: [PATCH 2/4] Fix signature --- distributed/worker_state_machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 357a2440d98..890aa0cdeda 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -756,7 +756,7 @@ def dummy( start: float = 0.0, stop: float = 1.0, nbytes: int = 8, - type_: Type | None = None, + type: Type | None = None, stimulus_id: str, ) -> ExecuteSuccessEvent: """Build a dummy event, with most attributes set to a reasonable default. @@ -768,7 +768,7 @@ def dummy( start=start, stop=stop, nbytes=nbytes, - type=type_, + type=type, stimulus_id=stimulus_id, ) From b3c006d220d016b9e69e4216d6f48dce83bbab66 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 7 Jul 2022 22:31:21 +0100 Subject: [PATCH 3/4] Code review --- .../tests/test_worker_state_machine.py | 13 ++++--- distributed/worker_state_machine.py | 39 +++++++------------ 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 3a00d64d6e7..f08af1af8ca 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -308,7 +308,7 @@ def test_computetask_to_dict(): def test_computetask_dummy(): - ev = ComputeTaskEvent.dummy(key="x", stimulus_id="s") + ev = ComputeTaskEvent.dummy("x", stimulus_id="s") assert ev == ComputeTaskEvent( key="x", who_has={}, @@ -326,7 +326,7 @@ def test_computetask_dummy(): ) # nbytes is generated from who_has if omitted - ev2 = ComputeTaskEvent.dummy(key="x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s") + ev2 = ComputeTaskEvent.dummy("x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s") assert ev2.nbytes == {"y": 1} @@ -392,17 +392,20 @@ def test_executesuccess_to_dict(): def test_executesuccess_dummy(): - ev = ExecuteSuccessEvent.dummy(key="x", stimulus_id="s") + ev = ExecuteSuccessEvent.dummy("x", stimulus_id="s") assert ev == ExecuteSuccessEvent( key="x", value=None, start=0.0, stop=1.0, - nbytes=8, + nbytes=1, type=None, stimulus_id="s", ) + ev2 = ExecuteSuccessEvent.dummy("x", 123, stimulus_id="s") + assert ev2.value == 123 + def test_executefailure_to_dict(): ev = ExecuteFailureEvent( @@ -445,7 +448,7 @@ def test_executefailure_to_dict(): def test_executefailure_dummy(): - ev = ExecuteFailureEvent.dummy(key="x", stimulus_id="s") + ev = ExecuteFailureEvent.dummy("x", stimulus_id="s") assert ev == ExecuteFailureEvent( key="x", start=None, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 890aa0cdeda..e5b04da75c2 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -70,8 +70,6 @@ else: TaskStateState = str -Type = type - # TaskState.state subsets PROCESSING: set[TaskStateState] = { "waiting", @@ -697,8 +695,8 @@ def _after_from_dict(self) -> None: @staticmethod def dummy( - *, key: str, + *, who_has: dict[str, Collection[str]] | None = None, nbytes: dict[str, int] | None = None, priority: tuple[int, ...] = (0,), @@ -735,7 +733,7 @@ class ExecuteSuccessEvent(StateMachineEvent): start: float stop: float nbytes: int - type: Type | None + type: type | None __slots__ = tuple(__annotations__) def to_loggable(self, *, handled: float) -> StateMachineEvent: @@ -750,13 +748,10 @@ def _after_from_dict(self) -> None: @staticmethod def dummy( - *, key: str, value: object = None, - start: float = 0.0, - stop: float = 1.0, - nbytes: int = 8, - type: Type | None = None, + *, + nbytes: int = 1, stimulus_id: str, ) -> ExecuteSuccessEvent: """Build a dummy event, with most attributes set to a reasonable default. @@ -765,10 +760,10 @@ def dummy( return ExecuteSuccessEvent( key=key, value=value, - start=start, - stop=stop, + start=0.0, + stop=1.0, nbytes=nbytes, - type=type, + type=None, stimulus_id=stimulus_id, ) @@ -816,14 +811,8 @@ def from_exception( @staticmethod def dummy( - *, key: str, - start: float | None = None, - stop: float | None = None, - exception: Serialize | None = None, - traceback: Serialize | None = None, - exception_text: str = "", - traceback_text: str = "", + *, stimulus_id: str, ) -> ExecuteFailureEvent: """Build a dummy event, with most attributes set to a reasonable default. @@ -831,12 +820,12 @@ def dummy( """ return ExecuteFailureEvent( key=key, - start=start, - stop=stop, - exception=exception if exception is not None else Serialize(None), - traceback=traceback, - exception_text=exception_text, - traceback_text=traceback_text, + start=None, + stop=None, + exception=Serialize(None), + traceback=None, + exception_text="", + traceback_text="", stimulus_id=stimulus_id, ) From 86f598756969b27c93d26751f2d5d3dc892bbe7a Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 7 Jul 2022 22:36:31 +0100 Subject: [PATCH 4/4] More compact --- distributed/tests/test_cancelled_state.py | 8 +++----- distributed/tests/test_steal.py | 7 +------ 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 1d494c186ae..38d31c48b63 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -425,7 +425,7 @@ def test_cancelled_resumed_after_flight_with_dependencies_workerstate(ws): ws2 = "127.0.0.1:2" instructions = ws.handle_stimulus( # Create task x and put it in flight from ws2 - ComputeTaskEvent.dummy(key="y", who_has={"x": [ws2]}, stimulus_id="s1"), + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"), # The scheduler realises that ws2 is unresponsive, although ws doesn't know yet. # Having lost the last surviving replica of x, the scheduler cancels all of its # dependents. This also cancels x. @@ -433,7 +433,7 @@ def test_cancelled_resumed_after_flight_with_dependencies_workerstate(ws): # The scheduler reschedules x on another worker, which just happens to be one # that was previously fetching it. This does not generate an Execute # instruction, because the GatherDep instruction isn't complete yet. - ComputeTaskEvent.dummy(key="x", stimulus_id="s3"), + ComputeTaskEvent.dummy("x", stimulus_id="s3"), # After ~30s, the TCP socket with ws2 finally times out and collapses. # This triggers the Execute instruction. GatherDepNetworkFailureEvent(worker=ws2, total_nbytes=1, stimulus_id="s4"), @@ -556,9 +556,7 @@ def test_resume_executing_worker_state(ws_with_running_task): instructions = ws.handle_stimulus( FreeKeysEvent(keys=["x"], stimulus_id="s1"), - ComputeTaskEvent.dummy( - key="x", resource_restrictions={"R": 1}, stimulus_id="s2" - ), + ComputeTaskEvent.dummy("x", resource_restrictions={"R": 1}, stimulus_id="s2"), ) assert not instructions assert ws.tasks["x"] is ts diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index eb6b873d95b..a6df08d4c2e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1330,12 +1330,7 @@ def test_steal_worker_state(ws_with_running_task): assert ws.available_resources == {"R": 0} assert ws.tasks["x"].state == "cancelled" - instructions = ws.handle_stimulus( - ExecuteSuccessEvent.dummy( - key="x", - stimulus_id="s2", - ), - ) + instructions = ws.handle_stimulus(ExecuteSuccessEvent.dummy("x", stimulus_id="s2")) assert not instructions assert "x" not in ws.tasks assert "x" not in ws.data