From 8e34c4d1abccd9e379f8042148cfd7c34766b3a5 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 26 Jun 2023 09:42:06 -0700 Subject: [PATCH 1/2] Replace hardcoded port in UCX test by `open_port()` --- distributed/comm/tests/test_ucx_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/tests/test_ucx_config.py b/distributed/comm/tests/test_ucx_config.py index ef594a12009..3dd3b1efe9c 100644 --- a/distributed/comm/tests/test_ucx_config.py +++ b/distributed/comm/tests/test_ucx_config.py @@ -11,7 +11,7 @@ from distributed import Client from distributed.comm.ucx import _prepare_ucx_config -from distributed.utils import get_ip +from distributed.utils import get_ip, open_port from distributed.utils_test import gen_test, popen try: @@ -113,7 +113,7 @@ def test_ucx_config_w_env_var(ucx_loop, cleanup, loop): env = os.environ.copy() env["DASK_RMM__POOL_SIZE"] = "1000.00 MB" - port = "13339" + port = str(open_port()) # Using localhost appears to be less flaky than {HOST}. Additionally, this is # closer to how other dask worker tests are written. sched_addr = f"ucx://127.0.0.1:{port}" From 4449f25464b9dd5e4861a5cfc7c24194153ee4ce Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 26 Jun 2023 12:33:56 -0700 Subject: [PATCH 2/2] Limit resources in UCX tests Some UCX tests may fail when running from gpuCI's Docker container, even though those are not reproducible on the host where Docker is running. This behavior probably occurs due to the limitation of resources in the container, thus we reduce number of workers and threads to some of the UCX tests, which will make them unlikely to fail while still testing basic UCX functionality. --- distributed/comm/tests/test_ucx.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 185e4e1a0fd..04d4cd1833f 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -323,7 +323,9 @@ async def test_stress( async def test_simple( ucx_loop, ): - async with LocalCluster(protocol="ucx", asynchronous=True) as cluster: + async with LocalCluster( + protocol="ucx", n_workers=2, threads_per_worker=2, asynchronous=True + ) as cluster: async with Client(cluster, asynchronous=True) as client: assert cluster.scheduler_address.startswith("ucx://") assert await client.submit(lambda x: x + 1, 10) == 11 @@ -371,7 +373,9 @@ async def test_transpose( ): da = pytest.importorskip("dask.array") - async with LocalCluster(protocol="ucx", asynchronous=True) as cluster: + async with LocalCluster( + protocol="ucx", n_workers=2, threads_per_worker=2, asynchronous=True + ) as cluster: async with Client(cluster, asynchronous=True): assert cluster.scheduler_address.startswith("ucx://") x = da.ones((10000, 10000), chunks=(1000, 1000)).persist()