Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 119 additions & 63 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
gen_cluster,
inc,
lock_inc,
slowinc,
wait_for_state,
wait_for_stimulus,
)
from distributed.worker_state_machine import (
AddKeysMsg,
ComputeTaskEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
FreeKeysEvent,
GatherDep,
GatherDepFailureEvent,
GatherDepNetworkFailureEvent,
GatherDepSuccessEvent,
TaskFinishedMsg,
UpdateDataEvent,
)


Expand Down Expand Up @@ -231,53 +233,30 @@ async def wait_and_raise(*args, **kwargs):
w.state.story(f1.key),
[
(f1.key, "executing", "released", "cancelled", {}),
(
f1.key,
"cancelled",
"error",
"error",
{f2.key: "executing", f1.key: "released"},
),
(f1.key, "error", "released", "released", {f1.key: "forgotten"}),
(f1.key, "cancelled", "error", "released", {f1.key: "forgotten"}),
(f1.key, "released", "forgotten", "forgotten", {}),
],
)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_flight_cancelled_error(c, s, b):
"""One worker with one thread. We provoke an flight->cancelled transition
and let the task err."""
lock = asyncio.Lock()
await lock.acquire()
def test_flight_cancelled_error(ws):

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous test did not trip on #6877. I chose not to bother investigating why and just rewrote it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep both?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a test that does not test its declared use case is useless and misleading

"""Test flight -> cancelled -> error transition loop.
This can be caused by an issue while (un)pickling or a bug in the network stack.

class BrokenWorker(Worker):
block_get_data = True

async def get_data(self, comm, *args, **kwargs):
if self.block_get_data:
async with lock:
comm.abort()
return await super().get_data(comm, *args, **kwargs)

async with BrokenWorker(s.address) as a:
await c.wait_for_workers(2)
fut1 = c.submit(inc, 1, workers=[a.address], allow_other_workers=True)
fut2 = c.submit(inc, fut1, workers=[b.address])
await wait_for_state(fut1.key, "flight", b)
fut2.release()
fut1.release()
await wait_for_state(fut1.key, "cancelled", b)
lock.release()
# At this point we do not fetch the result of the future since the
# future itself would raise a cancelled exception. At this point we're
# concerned about the worker. The task should transition over error to
# be eventually forgotten since we no longer hold a ref.
while fut1.key in b.state.tasks:
await asyncio.sleep(0.01)
a.block_get_data = False
# Everything should still be executing as usual after this
assert await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10)))
See https://github.com/dask/distributed/issues/6877
"""
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"),
FreeKeysEvent(keys=["y", "x"], stimulus_id="s2"),
GatherDepFailureEvent.from_exception(
Exception(), worker=ws2, total_nbytes=1, stimulus_id="s3"
),
)
assert instructions == [
GatherDep(worker=ws2, to_gather={"x"}, total_nbytes=1, stimulus_id="s1")
]
assert not ws.tasks


@gen_cluster(client=True, nthreads=[("", 1)])
Expand Down Expand Up @@ -332,6 +311,7 @@ def block_execution(lock):
(fut1.key, "resumed", "released", "cancelled", {}),
# After gather_dep receives the data, the task is forgotten
(fut1.key, "cancelled", "memory", "released", {fut1.key: "forgotten"}),
(fut1.key, "released", "forgotten", "forgotten", {}),
],
)

Expand Down Expand Up @@ -369,7 +349,8 @@ def block_execution(event, lock):
b.state.story(fut1.key),
[
(fut1.key, "executing", "released", "cancelled", {}),
(fut1.key, "cancelled", "error", "error", {fut1.key: "released"}),
(fut1.key, "cancelled", "error", "released", {fut1.key: "forgotten"}),
(fut1.key, "released", "forgotten", "forgotten", {}),
],
)

Expand Down Expand Up @@ -480,14 +461,18 @@ async def test_resumed_cancelled_handle_compute(
lock_compute = Lock()
await lock_compute.acquire()
enter_compute = Event()
exit_compute = Event()

def block(x, lock, enter_event):
def block(x, lock, enter_event, exit_event):
enter_event.set()
with lock:
if raise_error:
raise RuntimeError("test error")
else:
return x + 1
try:
with lock:
if raise_error:
raise RuntimeError("test error")
else:
return x + 1
finally:
exit_event.set()

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
Expand All @@ -496,6 +481,7 @@ def block(x, lock, enter_event):
f2,
lock=lock_compute,
enter_event=enter_compute,
exit_event=exit_compute,
key="f3",
workers=[b.address],
)
Expand Down Expand Up @@ -523,17 +509,20 @@ async def release_all_futures():
await wait_for_state(f3.key, "resumed", b)
await release_all_futures()

if not wait_for_processing:
await lock_compute.release()
await exit_compute.wait()

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[b.address])
f4 = c.submit(sum, [f1, f3], key="f4", workers=[b.address])

if wait_for_processing:
await wait_for_state(f3.key, "processing", s)
await lock_compute.release()

await lock_compute.release()

if not raise_error:
if not wait_for_processing and not raise_error:

@crusaderky crusaderky Aug 16, 2022

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_resumed_cancelled_handle_compute defeated me.
I could not make any sense of it.
I found it so unfathomable that it made me originally can this PR and restart from scratch with #6716, and then again with #6844.
This change makes it green again, but I am not sure that the tested stories are not highlighting any problems.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #6905 with some explanations. I admit the test is a bit convoluted but I believe it is valuable

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the test is flaky now. I'll try investigating.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My best guess for the flakyness is that the ordering with wait_for_state(f3.key, "processing", s) and lock_compute.release is very timing sensitive. Previously, the state machine was wired in such a way that this wouldn't matter but that is no longer the case

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, if I add a sleep(1) just before await lock_compute.release(), the two tests with wait_for_processing=False fail deterministically.

It's a race condition of client.submit (client->scheduler comms) vs. distributed.Lock release (client->scheduler->worker).

Pushing a fix.

assert await f4 == 4 + 2

assert_story(
Expand All @@ -546,19 +535,55 @@ async def release_all_futures():
],
)

else:
elif not wait_for_processing and raise_error:
assert await f4 == 4 + 2

assert_story(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: instead of asserting on the story, we could as well assert on the events triggered. I consider the events much more readable and concise

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I'm actually very curious what kind of events we previously ignored. It makes sense that the worker should not behave identically with different scheduler timings but apparently we ignored something before this change

b.state.story(f3.key),
expect=[
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "error", "released", {f3.key: "fetch"}),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a change in behavior. Previously we stayed in the error state. That was arguable a false behavior given the definitions of the cancelled and resumed state but I don't think this will matter in practice.

Just pointing it out, I'm fine with the new behavior

(f3.key, "fetch", "flight", "flight", {}),
(f3.key, "flight", "missing", "missing", {}),
(f3.key, "missing", "waiting", "waiting", {f2.key: "fetch"}),
(f3.key, "waiting", "ready", "ready", {f3.key: "executing"}),
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "memory", "memory", {}),
],
)

elif wait_for_processing and not raise_error:
assert await f4 == 4 + 2

assert_story(
b.state.story(f3.key),
expect=[
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "waiting", "executing", {}),
(f3.key, "executing", "memory", "memory", {}),
Comment on lines +566 to +567

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 looks much cleaner. I vaguely remember that I intentionally did not reset to executing for some reason but I'm glad if we can do it properly. This makes the states much more deterministic

],
)

elif wait_for_processing and raise_error:
with pytest.raises(RuntimeError, match="test error"):
await f3

assert_story(
b.state.story(f3.key),
expect=[
[
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "error", "error", {}),
(f3.key, "resumed", "waiting", "executing", {}),
(f3.key, "executing", "error", "error", {}),
],
)
else:
assert False, "unreachable"


@pytest.mark.parametrize("intermediate_state", ["resumed", "cancelled"])
Expand All @@ -570,13 +595,9 @@ async def test_deadlock_cancelled_after_inflight_before_gather_from_worker(
"""If a task was transitioned to in-flight, the gather_dep coroutine was scheduled
but a cancel request came in before gather_data_from_worker was issued. This might
corrupt the state machine if the cancelled key is not properly handled.

See also
--------
test_workerstate_deadlock_cancelled_after_inflight_before_gather_from_worker
"""
fut1 = c.submit(slowinc, 1, workers=[a.address], key="f1")
fut1B = c.submit(slowinc, 2, workers=[x.address], key="f1B")
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
fut1B = c.submit(inc, 2, workers=[x.address], key="f1B")
fut2 = c.submit(sum, [fut1, fut1B], workers=[x.address], key="f2")
await fut2

Expand Down Expand Up @@ -661,14 +682,13 @@ def test_workerstate_executing_skips_fetch_on_success(ws_with_running_task):
ExecuteSuccessEvent.dummy("x", 123, stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="x", stimulus_id="s3"),
AddKeysMsg(keys=["x"], stimulus_id="s3"),
Execute(key="y", stimulus_id="s3"),
]
assert ws.tasks["x"].state == "memory"
assert ws.data["x"] == 123


@pytest.mark.xfail(reason="distributed#6689")
def test_workerstate_executing_failure_to_fetch(ws_with_running_task):
"""Test state loops:

Expand Down Expand Up @@ -887,3 +907,39 @@ async def resume():

# Test that x does not get stuck.
assert await fut == expect


@pytest.mark.parametrize("release_dep", [False, True])
@pytest.mark.parametrize("done_ev_cls", [ExecuteSuccessEvent, ExecuteFailureEvent])
def test_cancel_with_dependencies_in_memory(ws, release_dep, done_ev_cls):
"""Cancel an executing task y with an in-memory dependency x; then simulate that x
did not have any further dependents, so cancel x as well.

Test that x immediately transitions to released state and is forgotten as soon as
y finishes computing.

Read: https://github.com/dask/distributed/issues/6893"""
ws.handle_stimulus(
UpdateDataEvent(data={"x": 1}, report=False, stimulus_id="s1"),
ComputeTaskEvent.dummy("y", who_has={"x": [ws.address]}, stimulus_id="s2"),
)
assert ws.tasks["x"].state == "memory"
assert ws.tasks["y"].state == "executing"

ws.handle_stimulus(FreeKeysEvent(keys=["y"], stimulus_id="s3"))
assert ws.tasks["x"].state == "memory"
assert ws.tasks["y"].state == "cancelled"

if release_dep:
# This will happen iff x has no dependents or waiters on the scheduler
ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s4"))
assert ws.tasks["x"].state == "released"
assert ws.tasks["y"].state == "cancelled"

ws.handle_stimulus(done_ev_cls.dummy("y", stimulus_id="s5"))
assert "y" not in ws.tasks
assert "x" not in ws.tasks
else:
ws.handle_stimulus(done_ev_cls.dummy("y", stimulus_id="s5"))
assert "y" not in ws.tasks
assert ws.tasks["x"].state == "memory"
20 changes: 2 additions & 18 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,16 +1171,7 @@ def test_task_with_dependencies_acquires_resources(ws):

@pytest.mark.parametrize(
"done_ev_cls,done_status",
[
(ExecuteSuccessEvent, "memory"),
pytest.param(
ExecuteFailureEvent,
"flight",
marks=pytest.mark.xfail(
reason="distributed#6682,distributed#6689,distributed#6693"
),
),
],
[(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "flight")],
)
def test_resumed_task_releases_resources(
ws_with_running_task, done_ev_cls, done_status
Expand Down Expand Up @@ -1247,14 +1238,7 @@ def test_done_task_not_in_all_running_tasks(

@pytest.mark.parametrize(
"done_ev_cls,done_status",
[
(ExecuteSuccessEvent, "memory"),
pytest.param(
ExecuteFailureEvent,
"flight",
marks=pytest.mark.xfail(reason="distributed#6689"),
),
],
[(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "flight")],
)
def test_done_resumed_task_not_in_all_running_tasks(
ws_with_running_task, done_ev_cls, done_status
Expand Down
Loading