From dcdcd4cbeb73820f30c2efc778e3d3b25ebe953c Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 24 Nov 2022 18:28:42 +0100 Subject: [PATCH 1/4] Fix a deadlock when queued tasks are resubmitted quickly in succession --- distributed/scheduler.py | 2 +- distributed/tests/test_scheduler.py | 83 +++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b8150b190e6..6385095b269 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4702,7 +4702,7 @@ def stimulus_task_finished(self, key=None, worker=None, stimulus_id=None, **kwar ws: WorkerState = self.workers[worker] ts: TaskState = self.tasks.get(key) - if ts is None or ts.state == "released": + if ts is None or ts.state in ("released", "queued"): logger.debug( "Received already computed task, worker: %s, state: %s" ", key: %s, who_has: %s", diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 6961369d6be..556fa97b9c4 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4150,3 +4150,86 @@ async def test_transition_waiting_memory(c, s, a, b): assert s.tasks["x"].state == "no-worker" assert s.tasks["y"].state == "waiting" assert_story(s.story("y"), [("y", "waiting", "waiting", {})]) + + +@pytest.mark.parametrize("rootish", [True, False]) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_deadlock_resubmit_queued_tasks_fast(c, s, a, rootish): + # See https://github.com/dask/distributed/issues/7200 + block = Event() + block2 = Event() + executing = Event() + executing2 = Event() + + def block_on_event(*args, block, executing): + executing.set() + block.wait() + + if rootish: + ntasks = s.total_nthreads * 2 + 1 + else: + ntasks = 1 + keys = [f"fut-{i}" for i in range(ntasks)] + + def submit_tasks(): + # Use case would be a client rescheduling the same or a similar graph + # multiple times, closely followed + # df.head() + # df.size.compute() + # We're emulating this by submitting the sames *keys* + return c.map( + block_on_event, range(len(keys)), block=block, executing=executing, key=keys + ) + + def assert_rootish(): + # Just to verify our assumptions in case the definition changes. This is + # currently a bit brittle + if rootish: + assert all(s.is_rootish(s.tasks[k]) for k in keys) + else: + assert not any(s.is_rootish(s.tasks[k]) for k in keys) + + f1 = submit_tasks() + # Make sure that the worker is properly saturated + nblocking_tasks = 5 + + # This set of tasks is there to guarantee that the worker is saturated after + # releasing the first set of tasks s.t. a subsequent submission would run + # into queuing + fut2 = c.map( + block_on_event, range(nblocking_tasks), block=block2, executing=executing2 + ) + + # Once the task is on the threadpool, the client/scheduler may start its + # release chain + await executing.wait() + + assert len(a.state.tasks) + # To trigger this condition, the scheduler needs to receive the + # `task-finished` message after it performed the client release transitions + # Therefore, the worker must not receive the `free-keys`` signal before it + # can finish the task since otherwise the worker would recognize it as + # cancelled and would forget about it. We emulate this behavior by blocking + # the outgoing scheduler stream until that happens, i.e. this introduces + # artifical latency + with freeze_batched_send(s.stream_comms[a.address]): + del f1 + while any(k in s.tasks for k in keys): + await asyncio.sleep(0.005) + + assert len(s.tasks) == nblocking_tasks + fut3 = submit_tasks() + while len(s.tasks) == nblocking_tasks: + await asyncio.sleep(0.005) + assert_rootish() + if rootish: + assert all(s.tasks[k] in s.queued for k in keys) + await block.set() + # At this point we need/want to wait for the task-finished message to + # arrive on the scheduler. There is no proper hook to wait, therefore we + # sleep + await asyncio.sleep(0.2) + # Everything should finish properly after this + await block2.set() + await c.gather(fut2) + await c.gather(fut3) From 926198ae4076058cb6a31972f8579fb249dc0963 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 25 Nov 2022 10:00:35 +0100 Subject: [PATCH 2/4] Ensure all tasks are on the scheduler before proceeding --- distributed/tests/test_scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 556fa97b9c4..e8bb4802a6a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4200,6 +4200,9 @@ def assert_rootish(): block_on_event, range(nblocking_tasks), block=block2, executing=executing2 ) + while len(s.tasks) == nblocking_tasks + len(keys): + await asyncio.sleep(0.005) + # Once the task is on the threadpool, the client/scheduler may start its # release chain await executing.wait() From 9026d47f36bc6e0c6a9937c680ce96ae0ec4921e Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 25 Nov 2022 10:39:44 +0100 Subject: [PATCH 3/4] oops --- distributed/tests/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e8bb4802a6a..24f591ffc5c 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4200,7 +4200,7 @@ def assert_rootish(): block_on_event, range(nblocking_tasks), block=block2, executing=executing2 ) - while len(s.tasks) == nblocking_tasks + len(keys): + while len(s.tasks) != nblocking_tasks + len(keys): await asyncio.sleep(0.005) # Once the task is on the threadpool, the client/scheduler may start its From 9ecb67c26ac6f88846d2975f43c565690d5ecd8c Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 25 Nov 2022 14:58:44 +0100 Subject: [PATCH 4/4] Skip if rootish is disabled --- distributed/tests/test_scheduler.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 24f591ffc5c..62af49c6cc9 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -70,7 +70,9 @@ pytestmark = pytest.mark.ci1 - +QUEUING_ON_BY_DEFAULT = math.isfinite( + float(dask.config.get("distributed.scheduler.worker-saturation")) +) alice = "alice:1234" bob = "bob:1234" @@ -257,7 +259,7 @@ def random(**kwargs): @pytest.mark.skipif( - math.isfinite(float(dask.config.get("distributed.scheduler.worker-saturation"))), + QUEUING_ON_BY_DEFAULT, reason="Not relevant with queuing on; see https://github.com/dask/distributed/issues/7204", ) @gen_cluster( @@ -4152,7 +4154,19 @@ async def test_transition_waiting_memory(c, s, a, b): assert_story(s.story("y"), [("y", "waiting", "waiting", {})]) -@pytest.mark.parametrize("rootish", [True, False]) +@pytest.mark.parametrize( + "rootish", + [ + pytest.param( + True, + marks=pytest.mark.skipif( + not QUEUING_ON_BY_DEFAULT, + reason="Nothing will be classified as root-ish", + ), + ), + False, + ], +) @gen_cluster(client=True, nthreads=[("", 1)]) async def test_deadlock_resubmit_queued_tasks_fast(c, s, a, rootish): # See https://github.com/dask/distributed/issues/7200 @@ -4200,9 +4214,6 @@ def assert_rootish(): block_on_event, range(nblocking_tasks), block=block2, executing=executing2 ) - while len(s.tasks) != nblocking_tasks + len(keys): - await asyncio.sleep(0.005) - # Once the task is on the threadpool, the client/scheduler may start its # release chain await executing.wait()