From 84435c1727f1f04ecece93f648b4fef31a2d7586 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 14:25:18 -0600 Subject: [PATCH] Fix flaky `test_worker_who_has_clears_after_failed_connection` --- distributed/tests/test_failed_workers.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index fc7b202a961..423deca5f91 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -19,6 +19,7 @@ from distributed.utils import CancelledError, sync from distributed.utils_test import ( BlockedGatherDep, + BlockedGetData, async_wait_for, captured_logger, cluster, @@ -320,22 +321,18 @@ def __sizeof__(self) -> int: @pytest.mark.slow -@gen_cluster(client=True) +@gen_cluster(client=True, config={"distributed.scheduler.work-stealing": False}) async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): """This test is very sensitive to cluster state consistency. Timeouts often indicate subtle deadlocks. Be mindful when marking flaky/repeat/etc.""" - async with Nanny(s.address, nthreads=2) as n: + async with Nanny(s.address, nthreads=2, worker_class=BlockedGetData) as n: while len(s.workers) < 3: await asyncio.sleep(0.01) - def slow_ser(x, delay): - return SlowTransmitData(x, delay=delay) - n_worker_address = n.worker_address futures = c.map( - slow_ser, + inc, range(20), - delay=0.1, key=["f%d" % i for i in range(20)], workers=[n_worker_address], allow_other_workers=True, @@ -347,9 +344,7 @@ def sink(*args): await wait(futures) result_fut = c.submit(sink, futures, workers=a.address) - with suppress(CommClosedError): - await c.run(os._exit, 1, workers=[n_worker_address]) - + await n.kill(timeout=1) while len(s.workers) > 2: await asyncio.sleep(0.01)