Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,37 @@ async def test_resumed_cancelled_handle_compute(
Given the history of a task
executing -> cancelled -> resumed(fetch)

A handle_compute should properly restore executing.
Setup
-----
A task is running on the threadpool while a client is cancelling the
computations. In this case we cannot cancel the task on the threadpool but
need to transition the worker task to cancelled instead. While the task on
the threadpool is still running, the client resubmits the graph. This is
relatively common in interactive workflows in which a user cancels a
computation and resubmits shortly after.

This resubmission can decide to distribute tasks onto different workers than the initial submission

Parameters
----------
raise_error:
Both successful and erred results should be properly handled.

wait_for_processing:
Is the scheduler properly handling the task-done message of the worker
even if the task is not, yet, transitioned to processing again?

Expectation
-----------
A `handle_compute` should properly restore state `executing` even after the
task has transitioned through `cancelled` and `resumed(fetch)`.


See also
--------
test_worker_state_machine.py::test_executing_cancelled_fetch_executing
for minimal example
"""
# This test is heavily using set_restrictions to simulate certain scheduler
# decisions of placing keys

lock_compute = Lock()
await lock_compute.acquire()
Expand Down Expand Up @@ -515,6 +542,12 @@ async def release_all_futures():
await release_all_futures()
await wait_for_state(f3.key, "cancelled", b)

# At the second iteration, the scheduler chooses to distribute the tasks to
# different workers
# Particularly for our "stuck" task f3 that is still cancelled(executing) on
# worker B this will transition the task to a resumed(fetch) since it is no
# longer supposed to be computed on B. However, B will still require the
# data of f3 and is supposed to fetch it in case the compute fails
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[a.address])
Expand All @@ -523,6 +556,9 @@ async def release_all_futures():
await wait_for_state(f3.key, "resumed", b)
await release_all_futures()

# We're again cancelling, forcing f3 to transition back to executing from a
# resumed(fetch) state

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[b.address])
Expand Down
33 changes: 33 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
SerializedTask,
StateMachineEvent,
TaskErredMsg,
TaskFinishedMsg,
TaskState,
TransitionCounterMaxExceeded,
UnpauseEvent,
Expand Down Expand Up @@ -1293,3 +1294,35 @@ def test_gather_dep_failure(ws):
]
assert ws.tasks["x"].state == "error"
assert ws.tasks["y"].state == "waiting" # Not ready


@pytest.mark.parametrize("fail", [True, False])
def test_executing_cancelled_fetch_executing(ws, fail):

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This actually fails when using the ws_with_running_task fixture because we're not releasing a resource properly. Will try again after #6699

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this test to test_cancelled_state.py?
I'd rather have tests grouped by what they test, not by how they test it.
I already added a -m workerstate to pytest to run all WorkerState tests throughout the whole test suite (#6706) for this purpose.

"""See also test_cancelled_state.py::test_resumed_cancelled_handle_compute for full example"""

ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("x", stimulus_id="s1"),
FreeKeysEvent(keys=["x"], stimulus_id="s2"),
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s3"),
FreeKeysEvent(keys=["y"], stimulus_id="s4"),
ComputeTaskEvent.dummy("x", stimulus_id="s5"),
)

assert len(instructions) == 1
assert instructions[0] == Execute(key="x", stimulus_id="s1")
Comment on lines +1312 to +1313

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert len(instructions) == 1
assert instructions[0] == Execute(key="x", stimulus_id="s1")
assert instructions == [Execute(key="x", stimulus_id="s1")]

if fail:
instructions = ws.handle_stimulus(
ExecuteFailureEvent.dummy(key="x", stimulus_id="s6")
)
assert len(instructions) == 1
assert isinstance(instructions[0], TaskErredMsg)
Comment on lines +1318 to +1319

@crusaderky crusaderky Aug 18, 2022

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert len(instructions) == 1
assert isinstance(instructions[0], TaskErredMsg)
assert instructions == [TaskErredMsg.match(key="x", stimulus_id="s6")]

assert ws.tasks["x"].state == "error"

else:
instructions = ws.handle_stimulus(
ExecuteSuccessEvent.dummy(key="x", stimulus_id="s6")
)
assert len(instructions) == 1
assert isinstance(instructions[0], TaskFinishedMsg)
Comment on lines +1326 to +1327

@crusaderky crusaderky Aug 18, 2022

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert len(instructions) == 1
assert isinstance(instructions[0], TaskFinishedMsg)
assert instructions == [TaskFinishedMsg.match(key="x", stimulus_id="s6")]

assert ws.tasks["x"].state == "memory"