From 6fb9c3272cc5cec634596aa3fcf863e25a8dbf77 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 15 Sep 2021 14:50:25 +0200 Subject: [PATCH 1/5] Workers submit a reply to the scheduler if replica removal was rejected --- distributed/scheduler.py | 6 ++++-- distributed/tests/test_worker.py | 32 +++++++++++++++++++++++--------- distributed/worker.py | 18 ++++++++++++++---- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c538056e7dc..fc477c502f6 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6699,7 +6699,7 @@ async def retire_workers( return worker_keys - def add_keys(self, comm=None, worker=None, keys=()): + def add_keys(self, comm=None, worker=None, keys=(), stimulus_id=None): """ Learn that a worker has certain keys @@ -6722,12 +6722,14 @@ def add_keys(self, comm=None, worker=None, keys=()): redundant_replicas.append(key) if redundant_replicas: + if not stimulus_id: + stimulus_id = f"redundant-replicas-{time()}" self.worker_send( worker, { "op": "remove-replicas", "keys": redundant_replicas, - "stimulus_id": f"redundant-replicas-{time()}", + "stimulus_id": stimulus_id, }, ) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b68643b8448..d59d535ab5a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2860,13 +2860,16 @@ async def test_remove_replica_simple(c, s, a, b): await asyncio.sleep(0.01) -@gen_cluster(client=True) +@gen_cluster( + client=True, + config={"distributed.comm.recent-messages-log-length": 1_000}, +) async def test_remove_replica_while_computing(c, s, *workers): futs = c.map(inc, range(10), workers=[workers[0].address]) # All interesting things will happen on that worker w = workers[1] - intermediate = c.map(slowinc, futs, delay=0.1, workers=[w.address]) + intermediate = c.map(slowinc, futs, delay=0.05, workers=[w.address]) def reduce(*args, **kwargs): import time @@ -2875,24 +2878,35 @@ def reduce(*args, **kwargs): return final = c.submit(reduce, intermediate, workers=[w.address], key="final") - while final.key not in w.tasks: + while not any(f.key in w.tasks for f in intermediate): await asyncio.sleep(0.001) while not all(fut.done() for fut in intermediate): # The worker should reject all of these since they are required _remove_replicas(s, w, *futs) - _remove_replicas(s, w, *intermediate) - await asyncio.sleep(0.001) + await asyncio.sleep(0.01) await wait(intermediate) + # If a request is rejected, the worker responds with an add-keys message to + # reenlist the key in the schedulers state system to avoid race conditions, + # see also https://github.com/dask/distributed/issues/5265 + rejections = [msg[1] for msg in w.log if msg[0] == "remove-replica-rejected"] + assert rejections + for rejected_key in rejections: + + def answer_sent(key): + for batch in w.batched_stream.recent_message_log: + for msg in batch: + if "op" in msg and msg["op"] == "add-keys" and key in msg["keys"]: + return True + return False + + assert answer_sent(rejected_key) + # Since intermediate is done, futs replicas may be removed. # They might be already gone due to the above remove replica calls _remove_replicas(s, w, *futs) - # the intermediate tasks should not be touched because they are still needed - # (the scheduler should not have made the above call but we should be safe - # regarless) - assert all(w.tasks[f.key].state == "memory" for f in intermediate) while any(w.tasks[f.key].state != "released" for f in futs if f.key in w.tasks): await asyncio.sleep(0.001) diff --git a/distributed/worker.py b/distributed/worker.py index 4e240dfa58b..8b1095afe67 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1571,7 +1571,9 @@ def update_data( self.log.append((key, "receive-from-scatter")) if report: - scheduler_messages.append({"op": "add-keys", "keys": list(data)}) + scheduler_messages.append( + {"op": "add-keys", "keys": list(data), "stimulus_id": stimulus_id} + ) self.transitions(recommendations, stimulus_id=stimulus_id) for msg in scheduler_messages: @@ -1621,8 +1623,16 @@ def handle_remove_replicas(self, keys, stimulus_id): recommendations = {} for key in keys: ts = self.tasks.get(key) - if ts and not ts.is_protected(): + if ts is None or ts.state != "memory": + continue + if not ts.is_protected(): + self.log.append(("remove-replica-confirmed", ts.key, stimulus_id)) recommendations[ts] = "released" if ts.dependents else "forgotten" + else: + self.log.append(("remove-replica-rejected", ts.key, stimulus_id)) + self.batched_stream.send( + {"op": "add-keys", "keys": [ts.key], "stimulus_id": stimulus_id} + ) self.transitions(recommendations=recommendations, stimulus_id=stimulus_id) @@ -2096,14 +2106,14 @@ def transition_executing_long_running(self, ts, compute_duration, *, stimulus_id def transition_released_memory(self, ts, value, *, stimulus_id): recs, smsgs = self._put_key_in_memory(ts, value, stimulus_id=stimulus_id) - smsgs.append({"op": "add-keys", "keys": [ts.key]}) + smsgs.append({"op": "add-keys", "keys": [ts.key], "stimulus_id": stimulus_id}) return recs, smsgs def transition_flight_memory(self, ts, value, *, stimulus_id): self._in_flight_tasks.discard(ts) ts.coming_from = None recs, smsgs = self._put_key_in_memory(ts, value, stimulus_id=stimulus_id) - smsgs.append({"op": "add-keys", "keys": [ts.key]}) + smsgs.append({"op": "add-keys", "keys": [ts.key], "stimulus_id": stimulus_id}) return recs, smsgs def transition_released_forgotten(self, ts, *, stimulus_id): From d2debca3f64a80b5bc104ea4aae9601892dee3da Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 28 Sep 2021 15:27:41 +0200 Subject: [PATCH 2/5] Ensure scheduler who_has state is restored after worker response --- distributed/tests/test_worker.py | 37 +++++++++++++++++++++++++++----- distributed/worker.py | 13 +++++++---- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d59d535ab5a..8cdcddc479a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2760,8 +2760,14 @@ def _acquire_replicas(scheduler, worker, *futures): def _remove_replicas(scheduler, worker, *futures): keys = [f.key for f in futures] - - scheduler.stream_comms[worker.address].send( + ws = scheduler.workers[worker.address] + for k in keys: + ts = scheduler.tasks[k] + if ws in ts.who_has: + ts.who_has.remove(ws) + ws._nbytes -= ts.get_nbytes() + del scheduler.workers[ws.address]._has_what[ts] + scheduler.stream_comms[ws.address].send( { "op": "remove-replicas", "keys": keys, @@ -2859,6 +2865,10 @@ async def test_remove_replica_simple(c, s, a, b): while not all(len(s.tasks[f.key].who_has) == 1 for f in futs): await asyncio.sleep(0.01) + # Ensure there is no delayed reply to re-register the key + await asyncio.sleep(0.01) + assert all(s.tasks[f.key].who_has == {s.workers[a.address]} for f in futs) + @gen_cluster( client=True, @@ -2878,11 +2888,26 @@ def reduce(*args, **kwargs): return final = c.submit(reduce, intermediate, workers=[w.address], key="final") + while not any(f.key in w.tasks for f in intermediate): await asyncio.sleep(0.001) + # The scheduler removes keys from who_has/has_what immediately + # Make sure the worker responds to the rejection and the scheduler corrects + # the state + ws = s.workers[w.address] + while not any(s.tasks[fut.key] in ws.has_what for fut in futs): + await asyncio.sleep(0.001) + + _remove_replicas(s, w, *futs) + # Scheduler removed keys immediately... + assert not any(s.tasks[fut.key] in ws.has_what for fut in futs) + # ... but the state is properly restored + while not any(s.tasks[fut.key] in ws.has_what for fut in futs): + await asyncio.sleep(0.01) + + # The worker should reject all of these since they are required while not all(fut.done() for fut in intermediate): - # The worker should reject all of these since they are required _remove_replicas(s, w, *futs) await asyncio.sleep(0.01) @@ -2891,8 +2916,10 @@ def reduce(*args, **kwargs): # If a request is rejected, the worker responds with an add-keys message to # reenlist the key in the schedulers state system to avoid race conditions, # see also https://github.com/dask/distributed/issues/5265 - rejections = [msg[1] for msg in w.log if msg[0] == "remove-replica-rejected"] - assert rejections + rejections = set() + for msg in w.log: + if msg[0] == "remove-replica-rejected": + rejections.update(msg[1]) for rejected_key in rejections: def answer_sent(key): diff --git a/distributed/worker.py b/distributed/worker.py index 8b1095afe67..9d408c9c5ba 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1621,6 +1621,8 @@ def handle_remove_replicas(self, keys, stimulus_id): """ self.log.append(("remove-replicas", keys, stimulus_id)) recommendations = {} + + rejected = [] for key in keys: ts = self.tasks.get(key) if ts is None or ts.state != "memory": @@ -1629,10 +1631,13 @@ def handle_remove_replicas(self, keys, stimulus_id): self.log.append(("remove-replica-confirmed", ts.key, stimulus_id)) recommendations[ts] = "released" if ts.dependents else "forgotten" else: - self.log.append(("remove-replica-rejected", ts.key, stimulus_id)) - self.batched_stream.send( - {"op": "add-keys", "keys": [ts.key], "stimulus_id": stimulus_id} - ) + rejected.append(key) + + if rejected: + self.log.append(("remove-replica-rejected", rejected, stimulus_id)) + self.batched_stream.send( + {"op": "add-keys", "keys": rejected, "stimulus_id": stimulus_id} + ) self.transitions(recommendations=recommendations, stimulus_id=stimulus_id) From ac9b7f438c6499c7c062283ee7d2b29563b237ad Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Tue, 28 Sep 2021 15:35:04 +0200 Subject: [PATCH 3/5] Update distributed/tests/test_worker.py Co-authored-by: crusaderky --- distributed/tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8cdcddc479a..0738c46b545 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2766,7 +2766,7 @@ def _remove_replicas(scheduler, worker, *futures): if ws in ts.who_has: ts.who_has.remove(ws) ws._nbytes -= ts.get_nbytes() - del scheduler.workers[ws.address]._has_what[ts] + del ws._has_what[ts] scheduler.stream_comms[ws.address].send( { "op": "remove-replicas", From 4a46dc9a01581ab86b903d4b78699ed6a5c222fa Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 28 Sep 2021 15:54:15 +0200 Subject: [PATCH 4/5] review comments about test --- distributed/tests/test_worker.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8cdcddc479a..c17a9bdcceb 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2858,11 +2858,9 @@ async def test_remove_replica_simple(c, s, a, b): _remove_replicas(s, b, *futs) - while b.tasks: - await asyncio.sleep(0.01) + assert all(len(s.tasks[f.key].who_has) == 1 for f in futs) - # might take a moment for the reply to reach the scheduler - while not all(len(s.tasks[f.key].who_has) == 1 for f in futs): + while b.tasks: await asyncio.sleep(0.01) # Ensure there is no delayed reply to re-register the key From 846d7f85d471f0188c563d30b08c1bda7796b635 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Tue, 28 Sep 2021 16:51:40 +0200 Subject: [PATCH 5/5] Update distributed/tests/test_worker.py Co-authored-by: crusaderky --- distributed/tests/test_worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 4c214810d9a..a250e6827d9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2764,9 +2764,7 @@ def _remove_replicas(scheduler, worker, *futures): for k in keys: ts = scheduler.tasks[k] if ws in ts.who_has: - ts.who_has.remove(ws) - ws._nbytes -= ts.get_nbytes() - del ws._has_what[ts] + scheduler.remove_replica(ts, ws) scheduler.stream_comms[ws.address].send( { "op": "remove-replicas",