From 23b6d546250599feae2800b0d52002ac25d58e70 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 22 Jun 2022 11:02:23 +0100 Subject: [PATCH 1/2] Adding replicas to a task in fetch now sends it to flight immediately --- .../tests/test_worker_state_machine.py | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 47c767c68fe..1f3d8039179 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -16,7 +16,6 @@ from distributed.scheduler import TaskState as SchedulerTaskState from distributed.utils import recursive_to_dict from distributed.utils_test import ( - BlockedGetData, _LockedCommPool, assert_story, freeze_data_fetching, @@ -569,9 +568,7 @@ async def test_fetch_to_missing(c, s, a, b): ) -@pytest.mark.skip(reason="https://github.com/dask/distributed/issues/6446") -@gen_cluster(client=True) -async def test_new_replica_while_all_workers_in_flight(c, s, w1, w2): +def test_new_replica_while_all_workers_in_flight(ws): """A task is stuck in 'fetch' state because all workers that hold a replica are in flight. While in this state, a new replica appears on a different worker and the scheduler informs the waiting worker through a new acquire-replicas or @@ -585,35 +582,39 @@ async def test_new_replica_while_all_workers_in_flight(c, s, w1, w2): Test that, when this happens, the task is immediately acquired from the new worker, without waiting for the original replica holders to get out of flight. """ - # Make sure find_missing is not involved - w1.periodic_callbacks["find-missing"].stop() - - async with BlockedGetData(s.address) as w3: - x = c.submit(inc, 1, key="x", workers=[w3.address]) - y = c.submit(inc, 2, key="y", workers=[w3.address]) - await wait([x, y]) - s.request_acquire_replicas(w1.address, ["x"], stimulus_id="test") - await w3.in_get_data.wait() - assert w1.state.tasks["x"].state == "flight" - s.request_acquire_replicas(w1.address, ["y"], stimulus_id="test") - # This cannot progress beyond fetch because w3 is already in flight - await wait_for_state("y", "fetch", w1) - - # Simulate that the AMM also requires that w2 acquires a replica of x. - # The replica lands on w2 soon afterwards, while w3->w1 comms remain blocked by - # unrelated transfers (x in our case). - w2.update_data({"y": 3}, report=True) - ws2 = s.workers[w2.address] - while ws2 not in s.tasks["y"].who_has: - await asyncio.sleep(0.01) - - # 2 seconds later, the AMM reiterates that w1 should acquire a replica of y - s.request_acquire_replicas(w1.address, ["y"], stimulus_id="test") - await wait_for_state("y", "memory", w1) - - # Finally let the other worker to get out of flight - w3.block_get_data.set() - await wait_for_state("x", "memory", w1) + instructions = ws.handle_stimulus( + AcquireReplicasEvent( + who_has={"x": ["127.0.0.1:2"]}, + nbytes={"x": 1}, + stimulus_id="s1", + ), + AcquireReplicasEvent( + who_has={"y": ["127.0.0.1:2"]}, + nbytes={"y": 1}, + stimulus_id="s2", + ), + AcquireReplicasEvent( + who_has={"y": ["127.0.0.1:2", "127.0.0.1:3"]}, + nbytes={"y": 1}, + stimulus_id="s3", + ), + ) + assert instructions == [ + GatherDep( + worker="127.0.0.1:2", + to_gather={"x"}, + total_nbytes=1, + stimulus_id="s1", + ), + GatherDep( + worker="127.0.0.1:3", + to_gather={"y"}, + total_nbytes=1, + stimulus_id="s3", + ), + ] + assert ws.tasks["x"].state == "flight" + assert ws.tasks["y"].state == "flight" @gen_cluster(client=True) From ec1633673b9e1fa8d91f27b4e8776f06dd8e7412 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 27 Jun 2022 13:21:36 +0100 Subject: [PATCH 2/2] Update distributed/tests/test_worker_state_machine.py Co-authored-by: Florian Jetter --- distributed/tests/test_worker_state_machine.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 1f3d8039179..fdfbcb413d4 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -582,32 +582,34 @@ def test_new_replica_while_all_workers_in_flight(ws): Test that, when this happens, the task is immediately acquired from the new worker, without waiting for the original replica holders to get out of flight. """ + ws2 = "127.0.0.1:2" + ws3 = "127.0.0.1:3" instructions = ws.handle_stimulus( AcquireReplicasEvent( - who_has={"x": ["127.0.0.1:2"]}, + who_has={"x": [ws2]}, nbytes={"x": 1}, stimulus_id="s1", ), AcquireReplicasEvent( - who_has={"y": ["127.0.0.1:2"]}, + who_has={"y": [ws2]}, nbytes={"y": 1}, stimulus_id="s2", ), AcquireReplicasEvent( - who_has={"y": ["127.0.0.1:2", "127.0.0.1:3"]}, + who_has={"y": [ws2, ws3]}, nbytes={"y": 1}, stimulus_id="s3", ), ) assert instructions == [ GatherDep( - worker="127.0.0.1:2", + worker=ws2, to_gather={"x"}, total_nbytes=1, stimulus_id="s1", ), GatherDep( - worker="127.0.0.1:3", + worker=ws3, to_gather={"y"}, total_nbytes=1, stimulus_id="s3",