Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 1 addition & 104 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import psutil
import pytest
from tlz import first, merge, pluck, sliding_window
from tlz import first, pluck, sliding_window
from tornado.ioloop import IOLoop

import dask
Expand Down Expand Up @@ -3026,109 +3026,6 @@ async def test_remove_replicas_simple(c, s, a, b):
assert all(s.tasks[f.key].who_has == {s.workers[a.address]} for f in futs)


@gen_cluster(
client=True,
nthreads=[("", 1), ("", 6)], # Up to 5 threads of b will get stuck; read below
config=merge(NO_AMM, {"distributed.comm.recent-messages-log-length": 1_000}),
)
async def test_remove_replicas_while_computing(c, s, a, b):
futs = c.map(inc, range(10), workers=[a.address])
dependents_event = distributed.Event()

def some_slow(x, event):
if x % 2:
event.wait()
return x + 1

# All interesting things will happen on b
dependents = c.map(some_slow, futs, event=dependents_event, workers=[b.address])

while not any(f.key in b.state.tasks for f in dependents):
await asyncio.sleep(0.01)

# The scheduler removes keys from who_has/has_what immediately
# Make sure the worker responds to the rejection and the scheduler corrects
# the state
ws = s.workers[b.address]

def ws_has_futs(aggr_func):
nonlocal futs
return aggr_func(s.tasks[fut.key] in ws.has_what for fut in futs)

# Wait for all futs to transfer over
while not ws_has_futs(all):
await asyncio.sleep(0.01)

# Wait for some dependent tasks to be done. No more than half of the dependents can
# finish, as the others are blocked on dependents_event.
# Note: for this to work reliably regardless of scheduling order, we need to have 6+
# threads. At the moment of writing it works with 2 because futures of Client.map
# are always scheduled from left to right, but we'd rather not rely on this
# assumption.
while not any(fut.status == "finished" for fut in dependents):
await asyncio.sleep(0.01)
assert not all(fut.status == "finished" for fut in dependents)

# Try removing the initial keys
s.request_remove_replicas(
b.address, [fut.key for fut in futs], stimulus_id=f"test-{time()}"
)
# Scheduler removed all keys immediately...
assert not ws_has_futs(any)
# ... but the state is properly restored for all tasks for which the dependent task
# isn't done yet
while not ws_has_futs(any):
await asyncio.sleep(0.01)

# Let the remaining dependent tasks complete
await dependents_event.set()
await wait(dependents)
assert ws_has_futs(any) and not ws_has_futs(all)

# If a request is rejected, the worker responds with an add-keys message to
# reenlist the key in the schedulers state system to avoid race conditions,
# see also https://github.com/dask/distributed/issues/5265
rejections = set()
for msg in b.state.log:
if msg[0] == "remove-replica-rejected":
rejections.update(msg[1])
assert rejections

def answer_sent(key):
for batch in b.batched_stream.recent_message_log:
for msg in batch:
if "op" in msg and msg["op"] == "add-keys" and key in msg["keys"]:
return True
return False

for rejected_key in rejections:
assert answer_sent(rejected_key)

# Now that all dependent tasks are done, futs replicas may be removed.
# They might be already gone due to the above remove replica calls
s.request_remove_replicas(
b.address,
[fut.key for fut in futs if ws in s.tasks[fut.key].who_has],
stimulus_id=f"test-{time()}",
)

while any(
b.state.tasks[f.key].state != "released" for f in futs if f.key in b.state.tasks
):
await asyncio.sleep(0.01)

# The scheduler actually gets notified about the removed replica
while ws_has_futs(any):
await asyncio.sleep(0.01)
# A replica is still on workers[0]
assert all(len(s.tasks[f.key].who_has) == 1 for f in futs)

del dependents, futs

while any(w.state.tasks for w in (a, b)):
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("", 1)] * 3, config=NO_AMM)
async def test_who_has_consistent_remove_replicas(c, s, *workers):
a = workers[0]
Expand Down
38 changes: 12 additions & 26 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
"constrained",
"executing",
"long-running",
"cancelled",
"resumed",
}
READY: set[TaskStateState] = {"ready", "constrained"}
# Valid states for a task that is found in TaskState.waiting_for_data
Expand Down Expand Up @@ -357,11 +355,6 @@ def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict:
# Remove all Nones and empty containers
return {k: v for k, v in out.items() if v}

def is_protected(self) -> bool:
return self.state in PROCESSING or any(
dep_ts.state in PROCESSING for dep_ts in self.dependents
)


@dataclass
class Instruction:
Expand Down Expand Up @@ -2837,35 +2830,28 @@ def _handle_remove_replicas(self, ev: RemoveReplicasEvent) -> RecsInstrs:
holding this unnecessary data, if the worker hasn't released the data itself,
already.

This handler does not guarantee the task nor the data to be actually
released but only asks the worker to release the data on a best effort
guarantee. This protects from race conditions where the given keys may
already have been rescheduled for compute in which case the compute
would win and this handler is ignored.
This handler only releases tasks that are indeed in state memory.

For stronger guarantees, see handler free_keys
"""
recommendations: Recs = {}
instructions: Instructions = []

rejected = []
for key in ev.keys:
ts = self.tasks.get(key)
if ts is None or ts.state != "memory":
continue
if not ts.is_protected():
self.log.append(
(ts.key, "remove-replica-confirmed", ev.stimulus_id, time())
)
recommendations[ts] = "released"
else:
rejected.append(key)

if rejected:
self.log.append(
("remove-replica-rejected", rejected, ev.stimulus_id, time())
)
instructions.append(AddKeysMsg(keys=rejected, stimulus_id=ev.stimulus_id))
# If the task is still in executing, the scheduler should never have
# asked the worker to drop this key.
# We cannot simply forget it because there is a time window between
# setting the state to executing and preparing/collecting the data
# for the task.
# If a dependency was released during this time, this would pop up
# as a KeyError during execute which is hard to understand
if any(dep.state == "executing" for dep in ts.dependents):
raise RuntimeError("Encountered invalid state")
self.log.append((ts.key, "remove-replica", ev.stimulus_id, time()))
recommendations[ts] = "released"

return recommendations, instructions

Expand Down