From a7837d361bc75c4c9f4d5e73512b8b850c4ad27e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 18 Jul 2023 15:22:00 +0100 Subject: [PATCH 1/3] gather_dep_hangs --- distributed/tests/test_worker.py | 74 ++++++++++++++++++++++++ distributed/tests/test_worker_metrics.py | 1 + distributed/worker.py | 5 +- 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index cf902c0b41d..ebe603d1fa5 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3351,6 +3351,80 @@ async def test_gather_dep_no_longer_in_flight_tasks(c, s, a): assert not any("missing-dep" in msg for msg in f2_story) +@pytest.mark.repeat(10) +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_gather_dep_cancelled_error(c, s, a): + """Something somewhere in the networking stack raises CancelledError while + gather_dep is running + + See Also + -------- + test_get_data_cancelled_error + https://github.com/dask/distributed/issues/8006 + """ + async with BlockedGetData(s.address) as b: + x = c.submit(inc, 1, key="x", workers=[b.address]) + y = c.submit(inc, x, key="y", workers=[a.address]) + await b.in_get_data.wait() + tasks = { + task for task in asyncio.all_tasks() if "gather_dep" in task.get_name() + } + assert tasks + # There should be only one task but cope with finding more just in case a + # previous test didn't properly clean up + for task in tasks: + task.cancel() + + b.block_get_data.set() + assert await y == 3 + + assert_story( + a.state.story("x"), + [ + ("x", "fetch", "flight", "flight", {}), + ("x", "flight", "missing", "missing", {}), + ("x", "missing", "fetch", "fetch", {}), + ("x", "fetch", "flight", "flight", {}), + ("x", "flight", "memory", "memory", {"y": "ready"}), + ], + ) + + +@pytest.mark.repeat(10) +@gen_cluster(client=True, nthreads=[("", 1)], timeout=5) +async def test_get_data_cancelled_error(c, s, a): + """Something somewhere in the networking stack raises CancelledError while + get_data is running + + See Also + -------- + test_gather_dep_cancelled_error + https://github.com/dask/distributed/issues/8006 + """ + + class FlakyInboundRPC(Worker): + flake = 0 + + def handle_comm(self, comm): + if self.flake: + self.flake -= 1 + + async def write(*args, **kwargs): + raise asyncio.CancelledError() + + comm.write = write + + return super().handle_comm(comm) + + async with FlakyInboundRPC(s.address) as b: + x = c.submit(inc, 1, key="x", workers=[b.address]) + await wait(x) + b.flake = 2 + y = c.submit(inc, x, key="y", workers=[a.address]) + assert await y == 3 + assert b.flake == 0 + + @gen_cluster(client=True, nthreads=[("", 1)]) async def test_Worker__to_dict(c, s, a): x = c.submit(inc, 1, key="x") diff --git a/distributed/tests/test_worker_metrics.py b/distributed/tests/test_worker_metrics.py index 7efd261f70e..de44def09a3 100644 --- a/distributed/tests/test_worker_metrics.py +++ b/distributed/tests/test_worker_metrics.py @@ -304,6 +304,7 @@ def expand(): b.state.validate = False +@pytest.mark.repeat(100) @gen_cluster( client=True, nthreads=[("", 1)], diff --git a/distributed/worker.py b/distributed/worker.py index fd4941a22b4..5f4ae233d9d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2081,7 +2081,10 @@ async def gather_dep( stimulus_id=f"gather-dep-success-{time()}", ) - except OSError: + # Note: CancelledError and asyncio.TimeoutError are rare conditions + # that can be raised by the network stack. + # See https://github.com/dask/distributed/issues/8006 + except (OSError, asyncio.CancelledError, asyncio.TimeoutError): logger.exception("Worker stream died during communication: %s", worker) self.state.log.append( ("gather-dep-failed", worker, to_gather, stimulus_id, time()) From 185922faa60fb9dfa7a1c19e637237e2b20f9f22 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 18 Jul 2023 17:44:41 +0100 Subject: [PATCH 2/3] Revert temp stress --- distributed/tests/test_worker.py | 2 -- distributed/tests/test_worker_metrics.py | 1 - 2 files changed, 3 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index ebe603d1fa5..29b789618c7 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3351,7 +3351,6 @@ async def test_gather_dep_no_longer_in_flight_tasks(c, s, a): assert not any("missing-dep" in msg for msg in f2_story) -@pytest.mark.repeat(10) @gen_cluster(client=True, nthreads=[("", 1)]) async def test_gather_dep_cancelled_error(c, s, a): """Something somewhere in the networking stack raises CancelledError while @@ -3390,7 +3389,6 @@ async def test_gather_dep_cancelled_error(c, s, a): ) -@pytest.mark.repeat(10) @gen_cluster(client=True, nthreads=[("", 1)], timeout=5) async def test_get_data_cancelled_error(c, s, a): """Something somewhere in the networking stack raises CancelledError while diff --git a/distributed/tests/test_worker_metrics.py b/distributed/tests/test_worker_metrics.py index de44def09a3..7efd261f70e 100644 --- a/distributed/tests/test_worker_metrics.py +++ b/distributed/tests/test_worker_metrics.py @@ -304,7 +304,6 @@ def expand(): b.state.validate = False -@pytest.mark.repeat(100) @gen_cluster( client=True, nthreads=[("", 1)], From 2b90156c9d72efee653748d2ee7aec6104863d7e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 20 Jul 2023 13:22:03 +0100 Subject: [PATCH 3/3] Add note --- distributed/worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/worker.py b/distributed/worker.py index 5f4ae233d9d..fe52f7960b2 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2097,6 +2097,8 @@ async def gather_dep( except Exception as e: # e.g. data failed to deserialize + # FIXME this will deadlock the cluster + # https://github.com/dask/distributed/issues/6705 logger.exception(e) self.state.log.append( ("gather-dep-failed", worker, to_gather, stimulus_id, time())