From 28df48369b673650275de2ed273a2e2cdd3a31ed Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 16 Jun 2022 19:28:37 +0200 Subject: [PATCH 01/12] Add fixtures to ensure free ports are picked --- distributed/cli/tests/__init__.py | 0 distributed/cli/tests/test_dask_scheduler.py | 136 +++++++++++++------ distributed/cli/tests/test_dask_worker.py | 42 +++--- distributed/utils.py | 1 + distributed/utils_test.py | 108 +++++++++++++++ 5 files changed, 224 insertions(+), 63 deletions(-) create mode 100644 distributed/cli/tests/__init__.py diff --git a/distributed/cli/tests/__init__.py b/distributed/cli/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 1a1cfb30de1..ca3389a80c9 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -1,3 +1,5 @@ +import re + import psutil import pytest @@ -26,11 +28,16 @@ assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, popen, - wait_for_log_line, ) -def test_defaults(loop): +def _get_dashboard_port(client: Client) -> int: + match = re.search(r":(\d+)\/status", client.dashboard_link) + assert match + return int(match.group(1)) + + +def test_defaults(loop, requires_default_ports): with popen(["dask-scheduler"]): async def f(): @@ -44,34 +51,36 @@ async def f(): response.raise_for_status() -def test_hostport(loop): - with popen(["dask-scheduler", "--no-dashboard", "--host", "127.0.0.1:8978"]): +def test_hostport(loop, free_port): + with popen( + ["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{free_port}"] + ): async def f(): # The scheduler's main port can't be contacted from the outside - await assert_can_connect_locally_4(8978, timeout=5.0) + await assert_can_connect_locally_4(int(free_port), timeout=5.0) - with Client("127.0.0.1:8978", loop=loop) as c: + with Client(f"127.0.0.1:{free_port}", loop=loop) as c: assert len(c.nthreads()) == 0 c.sync(f) -def test_no_dashboard(loop): +def test_no_dashboard(loop, requires_default_ports): with popen(["dask-scheduler", "--no-dashboard"]): with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): response = requests.get("http://127.0.0.1:8787/status/") assert response.status_code == 404 -def test_dashboard(loop): +def test_dashboard(loop, free_port): pytest.importorskip("bokeh") - with popen(["dask-scheduler"], capture_output=True) as proc: - line = wait_for_log_line(b"dashboard at", proc.stdout) - dashboard_port = int(line.decode().split(":")[-1].strip()) + with popen( + ["dask-scheduler", "--host", f"127.0.0.1:{free_port}"], + ) as proc: - with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): - pass + with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + dashboard_port = _get_dashboard_port(c) names = ["localhost", "127.0.0.1", get_ip()] start = time() @@ -95,36 +104,48 @@ def test_dashboard(loop): requests.get(f"http://127.0.0.1:{dashboard_port}/status/") -def test_dashboard_non_standard_ports(loop): +def test_dashboard_non_standard_ports(loop, free_port, free_port2): pytest.importorskip("bokeh") with popen( - ["dask-scheduler", "--port", "3448", "--dashboard-address", ":4832"] + [ + "dask-scheduler", + "--port", + str(free_port), + "--dashboard-address", + f":{free_port2}", + ] ) as proc: - with Client("127.0.0.1:3448", loop=loop) as c: + with Client(f"127.0.0.1:{free_port}", loop=loop) as c: pass start = time() while True: try: - response = requests.get("http://localhost:4832/status/") + response = requests.get(f"http://localhost:{free_port2}/status/") assert response.ok break except Exception: sleep(0.1) assert time() < start + 20 with pytest.raises(Exception): - requests.get("http://localhost:4832/status/") + requests.get(f"http://localhost:{free_port2}/status/") @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") -def test_dashboard_allowlist(loop): +def test_dashboard_allowlist(loop, free_port): pytest.importorskip("bokeh") with pytest.raises(Exception): requests.get("http://localhost:8787/status/").ok - with popen(["dask-scheduler"]) as proc: - with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c: + with popen( + [ + "dask-scheduler", + "--port", + str(free_port), + ] + ) as proc: + with Client(f"127.0.0.1:{free_port}", loop=loop) as c: pass start = time() @@ -140,7 +161,7 @@ def test_dashboard_allowlist(loop): assert time() < start + 20 -def test_interface(loop): +def test_interface(loop, free_port): if_names = sorted(psutil.net_if_addrs()) for if_name in if_names: try: @@ -156,11 +177,26 @@ def test_interface(loop): "Available interfaces are: %s." % (if_names,) ) - with popen(["dask-scheduler", "--no-dashboard", "--interface", if_name]) as s: + with popen( + [ + "dask-scheduler", + "--port", + free_port, + "--no-dashboard", + "--interface", + if_name, + ] + ) as s: with popen( - ["dask-worker", "127.0.0.1:8786", "--no-dashboard", "--interface", if_name] + [ + "dask-worker", + f"127.0.0.1:{free_port}", + "--no-dashboard", + "--interface", + if_name, + ] ) as a: - with Client("tcp://127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c: + with Client(f"tcp://127.0.0.1:{free_port}", loop=loop) as c: start = time() while not len(c.nthreads()): sleep(0.1) @@ -171,7 +207,7 @@ def test_interface(loop): @pytest.mark.flaky(reruns=10, reruns_delay=5) -def test_pid_file(loop): +def test_pid_file(loop, free_port): def check_pidfile(proc, pidfile): start = time() while not os.path.exists(pidfile): @@ -199,7 +235,13 @@ def check_pidfile(proc, pidfile): with tmpfile() as w: with popen( - ["dask-worker", "127.0.0.1:8786", "--pid-file", w, "--no-dashboard"] + [ + "dask-worker", + f"127.0.0.1:{free_port}", + "--pid-file", + w, + "--no-dashboard", + ] ) as worker: check_pidfile(worker, w) @@ -214,15 +256,20 @@ def test_scheduler_port_zero(loop): assert c.scheduler.port != 8786 -def test_dashboard_port_zero(loop): +def test_dashboard_port_zero(loop, free_port): pytest.importorskip("bokeh") with popen( - ["dask-scheduler", "--dashboard-address", ":0"], - capture_output=True, - ) as proc: - line = wait_for_log_line(b"dashboard at", proc.stdout) - dashboard_port = int(line.decode().split(":")[-1].strip()) - assert dashboard_port != 0 + [ + "dask-scheduler", + "--host", + f"127.0.0.1:{free_port}", + "--dashboard-address", + ":0", + ], + ): + with Client(f"tcp://127.0.0.1:{free_port}", loop=loop) as c: + port = _get_dashboard_port(c) + assert port > 0 PRELOAD_TEXT = """ @@ -434,16 +481,18 @@ def raise_ki(): signal.signal(signal.SIGINT, original_handler) -def test_multiple_workers_2(loop): +def test_multiple_workers_2(loop, free_port): text = """ def dask_setup(worker): worker.foo = 'setup' """ - with popen(["dask-scheduler", "--no-dashboard"]) as s: + with popen( + ["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{free_port}"] + ) as s: with popen( [ "dask-worker", - "localhost:8786", + f"localhost:{free_port}", "--no-dashboard", "--preload", text, @@ -451,7 +500,7 @@ def dask_setup(worker): text, ] ) as a: - with Client("127.0.0.1:8786", loop=loop) as c: + with Client(f"127.0.0.1:{free_port}", loop=loop) as c: c.wait_for_workers(1) [foo] = c.run(lambda dask_worker: dask_worker.foo).values() assert foo == "setup" @@ -459,11 +508,12 @@ def dask_setup(worker): assert foo == "setup" -def test_multiple_workers(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as s: - with popen(["dask-worker", "localhost:8786", "--no-dashboard"]) as a: - with popen(["dask-worker", "localhost:8786", "--no-dashboard"]) as b: - with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c: +def test_multiple_workers(loop, free_port): + scheduler_address = f"127.0.0.1:{free_port}" + with popen(["dask-scheduler", "--no-dashboard", "--host", scheduler_address]) as s: + with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as a: + with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as b: + with Client(scheduler_address, loop=loop) as c: start = time() while len(c.nthreads()) < 2: sleep(0.1) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 4cf32ddffa5..d243813724e 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -18,7 +18,6 @@ from distributed.compatibility import LINUX, WINDOWS from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time -from distributed.utils import open_port from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line @@ -162,7 +161,7 @@ def test_apportion_ports_bad(): @pytest.mark.slow @gen_cluster(client=True, nthreads=[]) -async def test_nanny_worker_ports(c, s): +async def test_nanny_worker_ports(c, s, free_port, free_port2): with popen( [ "dask-worker", @@ -170,15 +169,18 @@ async def test_nanny_worker_ports(c, s): "--host", "127.0.0.1", "--worker-port", - "9684", + free_port, "--nanny-port", - "5273", + free_port2, "--no-dashboard", ] ): await c.wait_for_workers(1) d = await c.scheduler.identity() - assert d["workers"]["tcp://127.0.0.1:9684"]["nanny"] == "tcp://127.0.0.1:5273" + assert ( + d["workers"][f"tcp://127.0.0.1:{free_port}"]["nanny"] + == f"tcp://127.0.0.1:{free_port2}" + ) @pytest.mark.slow @@ -346,8 +348,8 @@ def test_scheduler_file(loop, nanny): @pytest.mark.slow -def test_scheduler_address_env(loop, monkeypatch): - monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:8786") +def test_scheduler_address_env(loop, monkeypatch, free_port): + monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:{free_port}") with popen(["dask-scheduler", "--no-dashboard"]): with popen(["dask-worker", "--no-dashboard"]): with Client(os.environ["DASK_SCHEDULER_ADDRESS"], loop=loop) as c: @@ -418,11 +420,10 @@ async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): @pytest.mark.slow @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -@pytest.mark.parametrize( - "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] -) +@pytest.mark.parametrize("listen_address", ["tcp://0.0.0.0:", "tcp://127.0.0.2:"]) @gen_cluster(client=True, nthreads=[]) -async def test_contact_listen_address(c, s, nanny, listen_address): +async def test_contact_listen_address(c, s, nanny, listen_address, free_port): + listen_address += free_port with popen( [ "dask-worker", @@ -430,14 +431,14 @@ async def test_contact_listen_address(c, s, nanny, listen_address): nanny, "--no-dashboard", "--contact-address", - "tcp://127.0.0.2:39837", + f"tcp://127.0.0.2:{free_port}", "--listen-address", listen_address, ] ): await c.wait_for_workers(1) info = c.scheduler_info() - assert info["workers"].keys() == {"tcp://127.0.0.2:39837"} + assert info["workers"].keys() == {f"tcp://127.0.0.2:{free_port}"} # roundtrip works assert await c.submit(lambda x: x + 1, 10) == 11 @@ -445,7 +446,7 @@ async def test_contact_listen_address(c, s, nanny, listen_address): def func(dask_worker): return dask_worker.listener.listen_address - assert await c.run(func) == {"tcp://127.0.0.2:39837": listen_address} + assert await c.run(func) == {f"tcp://127.0.0.2:{free_port}": listen_address} @pytest.mark.slow @@ -453,7 +454,8 @@ def func(dask_worker): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("listen_address", ["tcp://:39838", "tcp://[::1]:39838"]) @gen_cluster(client=True, nthreads=[]) -async def test_listen_address_ipv6(c, s, nanny, listen_address): +async def test_listen_address_ipv6(c, s, nanny, listen_address, free_port): + listen_address += free_port with popen( [ "dask-worker", @@ -468,8 +470,8 @@ async def test_listen_address_ipv6(c, s, nanny, listen_address): # listening only on IPv6. bind_all = "[::1]" not in listen_address expected_ip = "127.0.0.1" if bind_all else "[::1]" - expected_name = f"tcp://{expected_ip}:39838" - expected_listen = "tcp://0.0.0.0:39838" if bind_all else listen_address + expected_name = f"tcp://{expected_ip}:{free_port}" + expected_listen = f"tcp://0.0.0.0:{free_port}" if bind_all else listen_address await c.wait_for_workers(1) info = c.scheduler_info() @@ -505,7 +507,7 @@ def func(dask_worker): @gen_cluster( client=True, nthreads=[], scheduler_kwargs={"dashboard_address": "localhost:8787"} ) -async def test_dashboard_non_standard_ports(c, s): +async def test_dashboard_non_standard_ports(c, s, requires_default_ports): pytest.importorskip("bokeh") requests = pytest.importorskip("requests") @@ -695,9 +697,9 @@ async def test_signal_handling(c, s, nanny, sig): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -def test_error_during_startup(monkeypatch, nanny): +def test_error_during_startup(monkeypatch, nanny, free_port): # see https://github.com/dask/distributed/issues/6320 - scheduler_port = str(open_port()) + scheduler_port = free_port scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}" monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr) diff --git a/distributed/utils.py b/distributed/utils.py index efd0daa8863..a9be0e18f40 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1059,6 +1059,7 @@ def open_port(host=""): """ # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, 0)) s.listen(1) port = s.getsockname()[1] diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 8c69ad073fe..5aca245e40e 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -68,6 +68,7 @@ get_mp_context, iscoroutinefunction, log_errors, + open_port, reset_logger_locks, sync, ) @@ -2428,3 +2429,110 @@ def ws(): state = WorkerState(address="127.0.0.1:1", transition_counter_max=50_000) yield state state.validate_state() + + +_ports_in_use = set() + + +@pytest.fixture() +def name_of_test(request): + return f"{request.node.nodeid}" + + +try: + # If we're not running with pytest-xdist we'll need to register this testrun_uid + import xdist # noqa +except ImportError: + + @pytest.fixture(scope="session") + def testrun_uid(): + return None + + +@pytest.fixture(scope="session", autouse=True) +def port_lock_filename(testrun_uid, tmpdir_factory): + if testrun_uid: + tmpdir = tmpdir_factory.mktemp("_distributed_test_port_lock", numbered=False) + + yield tmpdir / testrun_uid + else: + yield None + + +@pytest.fixture(scope="session") +def global_port_lock(port_lock_filename): + if port_lock_filename: + import locket + + lock = locket.lock_file(port_lock_filename) + yield lock + else: + + @contextmanager + def dummy_lock(): + yield + + yield dummy_lock() + + +@contextmanager +def _get_open_port(global_port_lock): + while True: + with global_port_lock: + port = open_port() + if port not in _ports_in_use: + _ports_in_use.add(port) + break + try: + yield port + finally: + _ports_in_use.discard(port) + + +@pytest.fixture() +def free_port(global_port_lock, name_of_test): + with _get_open_port(global_port_lock) as port: + print(f"Using free port {port} for test {name_of_test}") + yield str(port) + + +@pytest.fixture() +def free_port2(global_port_lock, name_of_test): + with _get_open_port(global_port_lock) as port: + print(f"Using second free port {port} for test {name_of_test}") + yield str(port) + + +@pytest.fixture() +def requires_default_ports(name_of_test, global_port_lock): + start = time() + + @contextmanager + def _bind_port(port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(("", port)) + s.listen(1) + try: + yield s + finally: + s.close() + + default_ports = [8786, 8787] + + while time() - start < 5: + try: + with global_port_lock: + with contextlib.ExitStack() as stack: + for port in default_ports: + stack.enter_context(_bind_port(port)) + break + except OSError as err: + if err.errno == 48: + print( + f"Address already in use. Waiting before running test {name_of_test}" + ) + sleep(1) + continue + else: + raise TimeoutError(f"Default ports didn't open up in time for {name_of_test}") From 768b81665f73324dbdc1fbe7668b09931e3f255f Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 21 Jun 2022 15:59:07 +0200 Subject: [PATCH 02/12] use nullctx --- distributed/utils_test.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 5aca245e40e..2cea3fc734b 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2440,7 +2440,7 @@ def name_of_test(request): try: - # If we're not running with pytest-xdist we'll need to register this testrun_uid + # # If we're not running with pytest-xdist we'll need to register this testrun_uid import xdist # noqa except ImportError: @@ -2467,12 +2467,7 @@ def global_port_lock(port_lock_filename): lock = locket.lock_file(port_lock_filename) yield lock else: - - @contextmanager - def dummy_lock(): - yield - - yield dummy_lock() + yield contextlib.nullcontext() @contextmanager From 3610b8fa0e96039b44244bd9663f895bc74775d4 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 21 Jun 2022 20:12:51 +0200 Subject: [PATCH 03/12] Use ephemeral-port-reserve-code --- distributed/cli/tests/test_dask_scheduler.py | 173 ++++++++++--------- distributed/cli/tests/test_dask_worker.py | 41 +++-- distributed/utils.py | 54 ++++-- distributed/utils_test.py | 89 ++-------- 4 files changed, 172 insertions(+), 185 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index ca3389a80c9..487a46f6f17 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -23,7 +23,7 @@ from distributed import Client, Scheduler from distributed.compatibility import LINUX, WINDOWS from distributed.metrics import time -from distributed.utils import get_ip, get_ip_interface +from distributed.utils import get_ip, get_ip_interface, open_port from distributed.utils_test import ( assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, @@ -46,21 +46,18 @@ async def f(): with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c: c.sync(f) + assert _get_dashboard_port(c) == 8787 - response = requests.get("http://127.0.0.1:8787/status/") - response.raise_for_status() - -def test_hostport(loop, free_port): - with popen( - ["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{free_port}"] - ): +def test_hostport(loop): + port = open_port() + with popen(["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"]): async def f(): # The scheduler's main port can't be contacted from the outside - await assert_can_connect_locally_4(int(free_port), timeout=5.0) + await assert_can_connect_locally_4(int(port), timeout=5.0) - with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port}", loop=loop) as c: assert len(c.nthreads()) == 0 c.sync(f) @@ -72,14 +69,15 @@ def test_no_dashboard(loop, requires_default_ports): assert response.status_code == 404 -def test_dashboard(loop, free_port): +def test_dashboard(loop): pytest.importorskip("bokeh") + port = open_port() with popen( - ["dask-scheduler", "--host", f"127.0.0.1:{free_port}"], - ) as proc: + ["dask-scheduler", "--host", f"127.0.0.1:{port}"], + ): - with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port}", loop=loop) as c: dashboard_port = _get_dashboard_port(c) names = ["localhost", "127.0.0.1", get_ip()] @@ -104,48 +102,50 @@ def test_dashboard(loop, free_port): requests.get(f"http://127.0.0.1:{dashboard_port}/status/") -def test_dashboard_non_standard_ports(loop, free_port, free_port2): +def test_dashboard_non_standard_ports(loop): pytest.importorskip("bokeh") - + port1 = open_port() + port2 = open_port() with popen( [ "dask-scheduler", "--port", - str(free_port), + str(port1), "--dashboard-address", - f":{free_port2}", + f":{port2}", ] ) as proc: - with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port1}", loop=loop) as c: pass start = time() while True: try: - response = requests.get(f"http://localhost:{free_port2}/status/") + response = requests.get(f"http://localhost:{port2}/status/") assert response.ok break except Exception: sleep(0.1) assert time() < start + 20 with pytest.raises(Exception): - requests.get(f"http://localhost:{free_port2}/status/") + requests.get(f"http://localhost:{port2}/status/") @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") -def test_dashboard_allowlist(loop, free_port): +def test_dashboard_allowlist(loop): pytest.importorskip("bokeh") with pytest.raises(Exception): requests.get("http://localhost:8787/status/").ok + port = open_port() with popen( [ "dask-scheduler", "--port", - str(free_port), + str(port), ] ) as proc: - with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port}", loop=loop) as c: pass start = time() @@ -161,7 +161,7 @@ def test_dashboard_allowlist(loop, free_port): assert time() < start + 20 -def test_interface(loop, free_port): +def test_interface(loop): if_names = sorted(psutil.net_if_addrs()) for if_name in if_names: try: @@ -177,11 +177,12 @@ def test_interface(loop, free_port): "Available interfaces are: %s." % (if_names,) ) + port = open_port() with popen( [ "dask-scheduler", "--port", - free_port, + str(port), "--no-dashboard", "--interface", if_name, @@ -190,13 +191,13 @@ def test_interface(loop, free_port): with popen( [ "dask-worker", - f"127.0.0.1:{free_port}", + f"127.0.0.1:{port}", "--no-dashboard", "--interface", if_name, ] ) as a: - with Client(f"tcp://127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: start = time() while not len(c.nthreads()): sleep(0.1) @@ -207,7 +208,9 @@ def test_interface(loop, free_port): @pytest.mark.flaky(reruns=10, reruns_delay=5) -def test_pid_file(loop, free_port): +def test_pid_file(loop): + port = open_port() + def check_pidfile(proc, pidfile): start = time() while not os.path.exists(pidfile): @@ -237,7 +240,7 @@ def check_pidfile(proc, pidfile): with popen( [ "dask-worker", - f"127.0.0.1:{free_port}", + f"127.0.0.1:{port}", "--pid-file", w, "--no-dashboard", @@ -256,18 +259,19 @@ def test_scheduler_port_zero(loop): assert c.scheduler.port != 8786 -def test_dashboard_port_zero(loop, free_port): +def test_dashboard_port_zero(loop): pytest.importorskip("bokeh") + port = open_port() with popen( [ "dask-scheduler", "--host", - f"127.0.0.1:{free_port}", + f"127.0.0.1:{port}", "--dashboard-address", ":0", ], ): - with Client(f"tcp://127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: port = _get_dashboard_port(c) assert port > 0 @@ -284,70 +288,78 @@ def get_scheduler_address(): """ -def test_preload_file(loop): +def test_preload_file(loop, tmp_path): def check_scheduler(): import scheduler_info return scheduler_info.get_scheduler_address() - tmpdir = tempfile.mkdtemp() - try: - path = os.path.join(tmpdir, "scheduler_info.py") - with open(path, "w") as f: - f.write(PRELOAD_TEXT) - with tmpfile() as fn: - with popen(["dask-scheduler", "--scheduler-file", fn, "--preload", path]): - with Client(scheduler_file=fn, loop=loop) as c: - assert c.run_on_scheduler(check_scheduler) == c.scheduler.address - finally: - shutil.rmtree(tmpdir) + path = tmp_path / "scheduler_info.py" + with open(path, "w") as f: + f.write(PRELOAD_TEXT) + with tmpfile() as fn: + with popen( + [ + "dask-scheduler", + "--scheduler-file", + fn, + "--preload", + path, + "--port", + str(open_port()), + ] + ): + with Client(scheduler_file=fn, loop=loop) as c: + assert c.run_on_scheduler(check_scheduler) == c.scheduler.address -def test_preload_module(loop): +def test_preload_module(loop, tmp_path): def check_scheduler(): import scheduler_info return scheduler_info.get_scheduler_address() - tmpdir = tempfile.mkdtemp() - try: - path = os.path.join(tmpdir, "scheduler_info.py") - with open(path, "w") as f: - f.write(PRELOAD_TEXT) - env = os.environ.copy() - if "PYTHONPATH" in env: - env["PYTHONPATH"] = tmpdir + ":" + env["PYTHONPATH"] - else: - env["PYTHONPATH"] = tmpdir - with tmpfile() as fn: - with popen( - [ - "dask-scheduler", - "--scheduler-file", - fn, - "--preload", - "scheduler_info", - ], - env=env, - ): - with Client(scheduler_file=fn, loop=loop) as c: - assert c.run_on_scheduler(check_scheduler) == c.scheduler.address - finally: - shutil.rmtree(tmpdir) + path = tmp_path / "scheduler_info.py" + with open(path, "w") as f: + f.write(PRELOAD_TEXT) + env = os.environ.copy() + if "PYTHONPATH" in env: + env["PYTHONPATH"] = tmp_path + ":" + env["PYTHONPATH"] + else: + env["PYTHONPATH"] = tmp_path + with tmpfile() as fn: + with popen( + [ + "dask-scheduler", + "--scheduler-file", + fn, + "--preload", + "scheduler_info", + "--port", + str(open_port()), + ], + env=env, + ): + with Client(scheduler_file=fn, loop=loop) as c: + assert c.run_on_scheduler(check_scheduler) == c.scheduler.address def test_preload_remote_module(loop, tmp_path): with open(tmp_path / "scheduler_info.py", "w") as f: f.write(PRELOAD_TEXT) - - with popen([sys.executable, "-m", "http.server", "9382"], cwd=tmp_path): + http_server_port = open_port() + with popen( + [sys.executable, "-m", "http.server", str(http_server_port)], cwd=tmp_path + ): with popen( [ "dask-scheduler", "--scheduler-file", str(tmp_path / "scheduler-file.json"), "--preload", - "http://localhost:9382/scheduler_info.py", + f"http://localhost:{http_server_port}/scheduler_info.py", + "--port", + str(open_port()), ] ) as proc: with Client( @@ -481,18 +493,19 @@ def raise_ki(): signal.signal(signal.SIGINT, original_handler) -def test_multiple_workers_2(loop, free_port): +def test_multiple_workers_2(loop): text = """ def dask_setup(worker): worker.foo = 'setup' """ + port = open_port() with popen( - ["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{free_port}"] + ["dask-scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"] ) as s: with popen( [ "dask-worker", - f"localhost:{free_port}", + f"localhost:{port}", "--no-dashboard", "--preload", text, @@ -500,7 +513,7 @@ def dask_setup(worker): text, ] ) as a: - with Client(f"127.0.0.1:{free_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port}", loop=loop) as c: c.wait_for_workers(1) [foo] = c.run(lambda dask_worker: dask_worker.foo).values() assert foo == "setup" @@ -508,8 +521,8 @@ def dask_setup(worker): assert foo == "setup" -def test_multiple_workers(loop, free_port): - scheduler_address = f"127.0.0.1:{free_port}" +def test_multiple_workers(loop): + scheduler_address = f"127.0.0.1:{open_port()}" with popen(["dask-scheduler", "--no-dashboard", "--host", scheduler_address]) as s: with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as a: with popen(["dask-worker", scheduler_address, "--no-dashboard"]) as b: diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index d243813724e..bade4eafe0a 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -18,6 +18,7 @@ from distributed.compatibility import LINUX, WINDOWS from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time +from distributed.utils import open_port from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line @@ -161,7 +162,9 @@ def test_apportion_ports_bad(): @pytest.mark.slow @gen_cluster(client=True, nthreads=[]) -async def test_nanny_worker_ports(c, s, free_port, free_port2): +async def test_nanny_worker_ports(c, s): + worker_port = open_port() + nanny_port = open_port() with popen( [ "dask-worker", @@ -169,17 +172,17 @@ async def test_nanny_worker_ports(c, s, free_port, free_port2): "--host", "127.0.0.1", "--worker-port", - free_port, + worker_port, "--nanny-port", - free_port2, + nanny_port, "--no-dashboard", ] ): await c.wait_for_workers(1) d = await c.scheduler.identity() assert ( - d["workers"][f"tcp://127.0.0.1:{free_port}"]["nanny"] - == f"tcp://127.0.0.1:{free_port2}" + d["workers"][f"tcp://127.0.0.1:{worker_port}"]["nanny"] + == f"tcp://127.0.0.1:{nanny_port}" ) @@ -348,8 +351,8 @@ def test_scheduler_file(loop, nanny): @pytest.mark.slow -def test_scheduler_address_env(loop, monkeypatch, free_port): - monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:{free_port}") +def test_scheduler_address_env(loop, monkeypatch): + monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", f"tcp://127.0.0.1:{open_port()}") with popen(["dask-scheduler", "--no-dashboard"]): with popen(["dask-worker", "--no-dashboard"]): with Client(os.environ["DASK_SCHEDULER_ADDRESS"], loop=loop) as c: @@ -422,8 +425,9 @@ async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("listen_address", ["tcp://0.0.0.0:", "tcp://127.0.0.2:"]) @gen_cluster(client=True, nthreads=[]) -async def test_contact_listen_address(c, s, nanny, listen_address, free_port): - listen_address += free_port +async def test_contact_listen_address(c, s, nanny, listen_address): + port = open_port() + listen_address += str(port) with popen( [ "dask-worker", @@ -431,14 +435,14 @@ async def test_contact_listen_address(c, s, nanny, listen_address, free_port): nanny, "--no-dashboard", "--contact-address", - f"tcp://127.0.0.2:{free_port}", + f"tcp://127.0.0.2:{port}", "--listen-address", listen_address, ] ): await c.wait_for_workers(1) info = c.scheduler_info() - assert info["workers"].keys() == {f"tcp://127.0.0.2:{free_port}"} + assert info["workers"].keys() == {f"tcp://127.0.0.2:{port}"} # roundtrip works assert await c.submit(lambda x: x + 1, 10) == 11 @@ -446,7 +450,7 @@ async def test_contact_listen_address(c, s, nanny, listen_address, free_port): def func(dask_worker): return dask_worker.listener.listen_address - assert await c.run(func) == {f"tcp://127.0.0.2:{free_port}": listen_address} + assert await c.run(func) == {f"tcp://127.0.0.2:{port}": listen_address} @pytest.mark.slow @@ -454,8 +458,9 @@ def func(dask_worker): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("listen_address", ["tcp://:39838", "tcp://[::1]:39838"]) @gen_cluster(client=True, nthreads=[]) -async def test_listen_address_ipv6(c, s, nanny, listen_address, free_port): - listen_address += free_port +async def test_listen_address_ipv6(c, s, nanny, listen_address): + port = open_port() + listen_address += port with popen( [ "dask-worker", @@ -470,8 +475,8 @@ async def test_listen_address_ipv6(c, s, nanny, listen_address, free_port): # listening only on IPv6. bind_all = "[::1]" not in listen_address expected_ip = "127.0.0.1" if bind_all else "[::1]" - expected_name = f"tcp://{expected_ip}:{free_port}" - expected_listen = f"tcp://0.0.0.0:{free_port}" if bind_all else listen_address + expected_name = f"tcp://{expected_ip}:{port}" + expected_listen = f"tcp://0.0.0.0:{port}" if bind_all else listen_address await c.wait_for_workers(1) info = c.scheduler_info() @@ -697,9 +702,9 @@ async def test_signal_handling(c, s, nanny, sig): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -def test_error_during_startup(monkeypatch, nanny, free_port): +def test_error_during_startup(monkeypatch, nanny): # see https://github.com/dask/distributed/issues/6320 - scheduler_port = free_port + scheduler_port = open_port() scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}" monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr) diff --git a/distributed/utils.py b/distributed/utils.py index a9be0e18f40..677b52b47e7 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1,7 +1,9 @@ from __future__ import annotations import asyncio +import contextlib import contextvars +import errno import functools import importlib import inspect @@ -1051,20 +1053,48 @@ def ensure_memoryview(obj): return mv -def open_port(host=""): - """Return a probably-open port +def open_port(host="", port=0): + """Bind to an ephemeral port, force it into the TIME_WAIT state, and unbind it. - There is a chance that this port will be taken by the operating system soon - after returning from this function. + This means that further ephemeral port alloctions won't pick this "reserved" port, + but subprocesses can still bind to it explicitly, given that they use SO_REUSEADDR. + By default on linux you have a grace period of 60 seconds to reuse this port. + To check your own particular value: + $ cat /proc/sys/net/ipv4/tcp_fin_timeout + 60 + + By default, the port will be reserved for localhost (aka 127.0.0.1). + To reserve a port for a different ip, provide the ip as the first argument. + Note that IP 0.0.0.0 is interpreted as localhost. + + vendored from https://github.com/Yelp/ephemeral-port-reserve/commit/403e67e9db1f49466c4bd29a8861004864168f68 + + see license (MIT) https://github.com/Yelp/ephemeral-port-reserve/blob/403e67e9db1f49466c4bd29a8861004864168f68/LICENSE """ - # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((host, 0)) - s.listen(1) - port = s.getsockname()[1] - s.close() - return port + port = int(port) + with contextlib.closing(socket.socket()) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((host, port)) + except OSError as e: + # socket.error: EADDRINUSE Address already in use + + if e.errno == errno.EADDRINUSE and port != 0: + s.bind((host, 0)) + else: + raise + + # the connect below deadlocks on kernel >= 4.4.0 unless this arg is greater than zero + s.listen(1) + + sockname = s.getsockname() + + # these three are necessary just to get the port into a TIME_WAIT state + with contextlib.closing(socket.socket()) as s2: + s2.connect(sockname) + sock, _ = s.accept() + with contextlib.closing(sock): + return sockname[1] def import_file(path: str) -> list[ModuleType]: diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 2cea3fc734b..c227f4688b4 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -4,6 +4,7 @@ import concurrent.futures import contextlib import copy +import errno import functools import gc import inspect @@ -68,7 +69,6 @@ get_mp_context, iscoroutinefunction, log_errors, - open_port, reset_logger_locks, sync, ) @@ -2431,99 +2431,36 @@ def ws(): state.validate_state() -_ports_in_use = set() - - @pytest.fixture() def name_of_test(request): return f"{request.node.nodeid}" -try: - # # If we're not running with pytest-xdist we'll need to register this testrun_uid - import xdist # noqa -except ImportError: - - @pytest.fixture(scope="session") - def testrun_uid(): - return None - - -@pytest.fixture(scope="session", autouse=True) -def port_lock_filename(testrun_uid, tmpdir_factory): - if testrun_uid: - tmpdir = tmpdir_factory.mktemp("_distributed_test_port_lock", numbered=False) - - yield tmpdir / testrun_uid - else: - yield None - - -@pytest.fixture(scope="session") -def global_port_lock(port_lock_filename): - if port_lock_filename: - import locket - - lock = locket.lock_file(port_lock_filename) - yield lock - else: - yield contextlib.nullcontext() - - -@contextmanager -def _get_open_port(global_port_lock): - while True: - with global_port_lock: - port = open_port() - if port not in _ports_in_use: - _ports_in_use.add(port) - break - try: - yield port - finally: - _ports_in_use.discard(port) - - -@pytest.fixture() -def free_port(global_port_lock, name_of_test): - with _get_open_port(global_port_lock) as port: - print(f"Using free port {port} for test {name_of_test}") - yield str(port) - - -@pytest.fixture() -def free_port2(global_port_lock, name_of_test): - with _get_open_port(global_port_lock) as port: - print(f"Using second free port {port} for test {name_of_test}") - yield str(port) - - @pytest.fixture() -def requires_default_ports(name_of_test, global_port_lock): +def requires_default_ports(name_of_test): start = time() @contextmanager def _bind_port(port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(("", port)) - s.listen(1) try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(("", port)) + s.listen(1) yield s finally: s.close() - default_ports = [8786, 8787] + default_ports = [8786] - while time() - start < 5: + while time() - start < _TEST_TIMEOUT: try: - with global_port_lock: - with contextlib.ExitStack() as stack: - for port in default_ports: - stack.enter_context(_bind_port(port)) - break + with contextlib.ExitStack() as stack: + for port in default_ports: + stack.enter_context(_bind_port(port=port)) + break except OSError as err: - if err.errno == 48: + if err.errno == errno.EADDRINUSE: print( f"Address already in use. Waiting before running test {name_of_test}" ) @@ -2531,3 +2468,5 @@ def _bind_port(port): continue else: raise TimeoutError(f"Default ports didn't open up in time for {name_of_test}") + + yield From 6537d5c310c6e649018b30307c92504fe1a5b145 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 11:06:19 +0200 Subject: [PATCH 04/12] Revert changes to open_port --- distributed/cli/tests/test_dask_worker.py | 4 +- distributed/utils.py | 49 +++++------------------ 2 files changed, 11 insertions(+), 42 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index bade4eafe0a..ce8e0e233f8 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -172,9 +172,9 @@ async def test_nanny_worker_ports(c, s): "--host", "127.0.0.1", "--worker-port", - worker_port, + str(worker_port), "--nanny-port", - nanny_port, + str(nanny_port), "--no-dashboard", ] ): diff --git a/distributed/utils.py b/distributed/utils.py index 677b52b47e7..a48dc8bc51e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -3,7 +3,6 @@ import asyncio import contextlib import contextvars -import errno import functools import importlib import inspect @@ -1053,48 +1052,18 @@ def ensure_memoryview(obj): return mv -def open_port(host="", port=0): - """Bind to an ephemeral port, force it into the TIME_WAIT state, and unbind it. +def open_port(host: str = "") -> int: + """Return a probably-open port - This means that further ephemeral port alloctions won't pick this "reserved" port, - but subprocesses can still bind to it explicitly, given that they use SO_REUSEADDR. - By default on linux you have a grace period of 60 seconds to reuse this port. - To check your own particular value: - $ cat /proc/sys/net/ipv4/tcp_fin_timeout - 60 - - By default, the port will be reserved for localhost (aka 127.0.0.1). - To reserve a port for a different ip, provide the ip as the first argument. - Note that IP 0.0.0.0 is interpreted as localhost. - - vendored from https://github.com/Yelp/ephemeral-port-reserve/commit/403e67e9db1f49466c4bd29a8861004864168f68 - - see license (MIT) https://github.com/Yelp/ephemeral-port-reserve/blob/403e67e9db1f49466c4bd29a8861004864168f68/LICENSE + There is a chance that this port will be taken by the operating system soon + after returning from this function. """ - port = int(port) - with contextlib.closing(socket.socket()) as s: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - s.bind((host, port)) - except OSError as e: - # socket.error: EADDRINUSE Address already in use - - if e.errno == errno.EADDRINUSE and port != 0: - s.bind((host, 0)) - else: - raise - - # the connect below deadlocks on kernel >= 4.4.0 unless this arg is greater than zero + # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python + with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind((host, 0)) s.listen(1) - - sockname = s.getsockname() - - # these three are necessary just to get the port into a TIME_WAIT state - with contextlib.closing(socket.socket()) as s2: - s2.connect(sockname) - sock, _ = s.accept() - with contextlib.closing(sock): - return sockname[1] + port = s.getsockname()[1] + return port def import_file(path: str) -> list[ModuleType]: From 9a3ebad3187bbae791c06a548c25bf7673869d4d Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 11:14:05 +0200 Subject: [PATCH 05/12] Fix more tests --- distributed/cli/tests/test_dask_worker.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index ce8e0e233f8..e04fd2a1a92 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -19,7 +19,13 @@ from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import open_port -from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line +from distributed.utils_test import ( + gen_cluster, + inc, + popen, + requires_ipv6, + wait_for_log_line, +) @pytest.mark.parametrize( @@ -352,8 +358,10 @@ def test_scheduler_file(loop, nanny): @pytest.mark.slow def test_scheduler_address_env(loop, monkeypatch): - monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", f"tcp://127.0.0.1:{open_port()}") - with popen(["dask-scheduler", "--no-dashboard"]): + port = open_port() + monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", f"tcp://127.0.0.1:{port}") + # The env var is only picked up by the dask-worker command + with popen(["dask-scheduler", "--no-dashboard", "--port", str(port)]): with popen(["dask-worker", "--no-dashboard"]): with Client(os.environ["DASK_SCHEDULER_ADDRESS"], loop=loop) as c: start = time() From f52d3d172085a3948aec45e578b857798f52c3be Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 11:28:26 +0200 Subject: [PATCH 06/12] Eliminate all occurences of hard-coded port 8786 --- distributed/cli/tests/test_dask_ssh.py | 4 +- distributed/cli/tests/test_tls_cli.py | 51 +++++++++++++++++++++----- distributed/deploy/tests/test_local.py | 12 +++--- distributed/tests/test_preload.py | 16 +++++--- distributed/tests/test_queues.py | 6 ++- distributed/tests/test_variable.py | 6 ++- 6 files changed, 68 insertions(+), 27 deletions(-) diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 1a2718dca00..49547e059f7 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -20,7 +20,7 @@ def test_version_option(): @pytest.mark.slow -def test_ssh_cli_nprocs_renamed_to_nworkers(loop): +def test_ssh_cli_nprocs_renamed_to_nworkers(loop, requires_default_port): with popen( ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], capture_output=True, @@ -33,7 +33,7 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop): wait_for_log_line(b"renamed to --nworkers", proc.stdout, max_lines=15) -def test_ssh_cli_nworkers_with_nprocs_is_an_error(): +def test_ssh_cli_nworkers_with_nprocs_is_an_error(requires_default_port): with popen( ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], capture_output=True, diff --git a/distributed/cli/tests/test_tls_cli.py b/distributed/cli/tests/test_tls_cli.py index f88d961bfbe..fbd5f0e5b9c 100644 --- a/distributed/cli/tests/test_tls_cli.py +++ b/distributed/cli/tests/test_tls_cli.py @@ -2,6 +2,7 @@ from distributed import Client from distributed.metrics import time +from distributed.utils import open_port from distributed.utils_test import ( get_cert, new_config_file, @@ -27,7 +28,7 @@ def wait_for_cores(c, nthreads=1): assert time() < start + 10 -def test_basic(loop): +def test_basic(loop, requires_default_port): with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s: with popen( ["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args @@ -39,33 +40,63 @@ def test_basic(loop): def test_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s: + port = open_port() + with popen( + [ + "dask-scheduler", + "--no-dashboard", + "--port", + str(port), + ] + + tls_args + ) as s: with popen( - ["dask-worker", "--no-dashboard", "--nanny", "tls://127.0.0.1:8786"] + ["dask-worker", "--no-dashboard", "--nanny", f"tls://127.0.0.1:{port}"] + tls_args ) as w: with Client( - "tls://127.0.0.1:8786", loop=loop, security=tls_security() + f"tls://127.0.0.1:{port}", loop=loop, security=tls_security() ) as c: wait_for_cores(c) def test_separate_key_cert(loop): - with popen(["dask-scheduler", "--no-dashboard"] + tls_args_2) as s: + port = open_port() + with popen( + [ + "dask-scheduler", + "--no-dashboard", + "--port", + str(port), + ] + + tls_args_2 + ) as s: with popen( - ["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args_2 + ["dask-worker", "--no-dashboard", f"tls://127.0.0.1:{port}"] + tls_args_2 ) as w: with Client( - "tls://127.0.0.1:8786", loop=loop, security=tls_security() + f"tls://127.0.0.1:{port}", loop=loop, security=tls_security() ) as c: wait_for_cores(c) def test_use_config_file(loop): + port = open_port() with new_config_file(tls_only_config()): - with popen(["dask-scheduler", "--no-dashboard", "--host", "tls://"]) as s: - with popen(["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"]) as w: + with popen( + [ + "dask-scheduler", + "--no-dashboard", + "--host", + "tls://", + "--port", + str(port), + ] + ) as s: + with popen( + ["dask-worker", "--no-dashboard", f"tls://127.0.0.1:{port}"] + ) as w: with Client( - "tls://127.0.0.1:8786", loop=loop, security=tls_security() + f"tls://127.0.0.1:{port}", loop=loop, security=tls_security() ) as c: wait_for_cores(c) diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 97b3885940b..694d68c4b4e 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -16,7 +16,7 @@ from distributed.core import Status from distributed.metrics import time from distributed.system import MEMORY_LIMIT -from distributed.utils import TimeoutError, sync +from distributed.utils import TimeoutError, open_port, sync from distributed.utils_test import ( assert_can_connect_from_everywhere_4, assert_can_connect_from_everywhere_4_6, @@ -153,16 +153,17 @@ def test_transports_tcp(): def test_transports_tcp_port(): + port = open_port() # Scheduler port specified => need TCP with LocalCluster( n_workers=1, processes=False, - scheduler_port=8786, + scheduler_port=port, silence_logs=False, dashboard_address=":0", ) as c: - assert c.scheduler_address == "tcp://127.0.0.1:8786" + assert c.scheduler_address == f"tcp://127.0.0.1:{port}" assert c.workers[0].address.startswith("tcp://") with Client(c.scheduler.address) as e: assert e.submit(inc, 4).result() == 5 @@ -756,6 +757,7 @@ def test_adapt_then_manual(loop): @pytest.mark.parametrize("temporary", [True, False]) def test_local_tls(loop, temporary): + port = open_port() if temporary: xfail_ssl_issue5601() pytest.importorskip("cryptography") @@ -764,7 +766,7 @@ def test_local_tls(loop, temporary): security = tls_only_security() with LocalCluster( n_workers=0, - scheduler_port=8786, + scheduler_port=port, silence_logs=False, security=security, dashboard_address=":0", @@ -833,7 +835,7 @@ def test_local_tls_restart(loop): security = tls_only_security() with LocalCluster( n_workers=1, - scheduler_port=8786, + scheduler_port=open_port(), silence_logs=False, security=security, dashboard_address=":0", diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 17963388afd..0360098ffed 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -12,6 +12,7 @@ import dask from distributed import Client, Nanny, Scheduler, Worker +from distributed.utils import open_port from distributed.utils_test import captured_logger, cluster, gen_cluster, gen_test PRELOAD_TEXT = """ @@ -204,15 +205,18 @@ async def test_scheduler_startup_nanny(s): @gen_test() async def test_web_preload_worker(): + port = open_port() + data = dedent( + f"""\ + import dask + dask.config.set(scheduler_address="tcp://127.0.0.1:{port}") + """ + ).encode() with mock.patch( "urllib3.PoolManager.request", - **{ - "return_value.data": b"import dask" - b'\ndask.config.set(scheduler_address="tcp://127.0.0.1:8786")' - b"\n" - }, + **{"return_value.data": data}, ) as request: - async with Scheduler(port=8786, host="localhost") as s: + async with Scheduler(port=port, host="localhost") as s: async with Nanny(preload_nanny=["http://example.com/preload"]) as nanny: assert nanny.scheduler_addr == s.address assert request.mock_calls == [ diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index 3cc2a17a657..b68d8869c4e 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -6,6 +6,7 @@ from distributed import Client, Nanny, Queue, TimeoutError, wait, worker_client from distributed.metrics import time +from distributed.utils import open_port from distributed.utils_test import div, gen_cluster, inc, popen @@ -278,11 +279,12 @@ def get(): def test_queue_in_task(loop): + port = open_port() # Ensure that we can create a Queue inside a task on a # worker in a separate Python process than the client with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: + with popen(["dask-worker", f"127.0.0.1:{port}"]): + with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: c.wait_for_workers(1) x = Queue("x") diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 9d4eb63cc19..97d0e32ad93 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -9,6 +9,7 @@ from distributed import Client, Nanny, TimeoutError, Variable, wait, worker_client from distributed.compatibility import WINDOWS from distributed.metrics import monotonic, time +from distributed.utils import open_port from distributed.utils_test import captured_logger, div, gen_cluster, inc, popen @@ -38,11 +39,12 @@ async def test_variable(c, s, a, b): def test_variable_in_task(loop): + port = open_port() # Ensure that we can create a Variable inside a task on a # worker in a separate Python process than the client with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: + with popen(["dask-worker", f"127.0.0.1:{port}"]): + with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: c.wait_for_workers(1) x = Variable("x") From 8a3bc7dc02feb59840f2804e1ea7fd1bec1a4a4e Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 12:46:18 +0200 Subject: [PATCH 07/12] More fixes --- distributed/cli/tests/test_dask_scheduler.py | 4 ++-- distributed/cli/tests/test_dask_ssh.py | 4 ++-- distributed/cli/tests/test_tls_cli.py | 2 +- distributed/tests/test_queues.py | 9 ++++++++- distributed/tests/test_variable.py | 2 +- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 487a46f6f17..c16697e7c38 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -324,9 +324,9 @@ def check_scheduler(): f.write(PRELOAD_TEXT) env = os.environ.copy() if "PYTHONPATH" in env: - env["PYTHONPATH"] = tmp_path + ":" + env["PYTHONPATH"] + env["PYTHONPATH"] = str(tmp_path) + ":" + env["PYTHONPATH"] else: - env["PYTHONPATH"] = tmp_path + env["PYTHONPATH"] = str(tmp_path) with tmpfile() as fn: with popen( [ diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 49547e059f7..9fc418b7bd8 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -20,7 +20,7 @@ def test_version_option(): @pytest.mark.slow -def test_ssh_cli_nprocs_renamed_to_nworkers(loop, requires_default_port): +def test_ssh_cli_nprocs_renamed_to_nworkers(loop, requires_default_ports): with popen( ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], capture_output=True, @@ -33,7 +33,7 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop, requires_default_port): wait_for_log_line(b"renamed to --nworkers", proc.stdout, max_lines=15) -def test_ssh_cli_nworkers_with_nprocs_is_an_error(requires_default_port): +def test_ssh_cli_nworkers_with_nprocs_is_an_error(requires_default_ports): with popen( ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], capture_output=True, diff --git a/distributed/cli/tests/test_tls_cli.py b/distributed/cli/tests/test_tls_cli.py index fbd5f0e5b9c..a7dfbc45dcc 100644 --- a/distributed/cli/tests/test_tls_cli.py +++ b/distributed/cli/tests/test_tls_cli.py @@ -28,7 +28,7 @@ def wait_for_cores(c, nthreads=1): assert time() < start + 10 -def test_basic(loop, requires_default_port): +def test_basic(loop, requires_default_ports): with popen(["dask-scheduler", "--no-dashboard"] + tls_args) as s: with popen( ["dask-worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index b68d8869c4e..ebd9d335990 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -282,7 +282,14 @@ def test_queue_in_task(loop): port = open_port() # Ensure that we can create a Queue inside a task on a # worker in a separate Python process than the client - with popen(["dask-scheduler", "--no-dashboard"]): + with popen( + [ + "dask-scheduler", + "--no-dashboard", + "--port", + str(port), + ] + ): with popen(["dask-worker", f"127.0.0.1:{port}"]): with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: c.wait_for_workers(1) diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 97d0e32ad93..0116a61778c 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -42,7 +42,7 @@ def test_variable_in_task(loop): port = open_port() # Ensure that we can create a Variable inside a task on a # worker in a separate Python process than the client - with popen(["dask-scheduler", "--no-dashboard"]): + with popen(["dask-scheduler", "--no-dashboard", "--port", str(port)]): with popen(["dask-worker", f"127.0.0.1:{port}"]): with Client(f"tcp://127.0.0.1:{port}", loop=loop) as c: c.wait_for_workers(1) From cf914b11000990090615b865e77074eb26f1aa0b Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 12:48:21 +0200 Subject: [PATCH 08/12] Use contextlib.closing --- distributed/utils_test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c227f4688b4..054a5244a25 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2442,14 +2442,16 @@ def requires_default_ports(name_of_test): @contextmanager def _bind_port(port): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: + with contextlib.closing( + socket.socket( + socket.AF_INET, + socket.SOCK_STREAM, + ) + ) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", port)) s.listen(1) yield s - finally: - s.close() default_ports = [8786] From afe006ddd7ba458990b8e4764804db16000d07b1 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 15:34:23 +0200 Subject: [PATCH 09/12] code review --- distributed/cli/tests/test_dask_scheduler.py | 21 +++++++------------- distributed/cli/tests/test_dask_worker.py | 6 ++---- distributed/cli/tests/test_tls_cli.py | 6 ++---- distributed/tests/test_queues.py | 3 +-- distributed/tests/test_worker.py | 4 ++-- distributed/utils.py | 3 +-- distributed/utils_test.py | 7 +------ 7 files changed, 16 insertions(+), 34 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index c16697e7c38..43b65bee418 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -109,10 +109,8 @@ def test_dashboard_non_standard_ports(loop): with popen( [ "dask-scheduler", - "--port", - str(port1), - "--dashboard-address", - f":{port2}", + f"--port={port1}", + f"--dashboard-address=:{port2}", ] ) as proc: with Client(f"127.0.0.1:{port1}", loop=loop) as c: @@ -141,8 +139,7 @@ def test_dashboard_allowlist(loop): with popen( [ "dask-scheduler", - "--port", - str(port), + f"--port={port}", ] ) as proc: with Client(f"127.0.0.1:{port}", loop=loop) as c: @@ -181,8 +178,7 @@ def test_interface(loop): with popen( [ "dask-scheduler", - "--port", - str(port), + f"--port={port}", "--no-dashboard", "--interface", if_name, @@ -305,8 +301,7 @@ def check_scheduler(): fn, "--preload", path, - "--port", - str(open_port()), + f"--port={open_port()}", ] ): with Client(scheduler_file=fn, loop=loop) as c: @@ -335,8 +330,7 @@ def check_scheduler(): fn, "--preload", "scheduler_info", - "--port", - str(open_port()), + f"--port={open_port()}", ], env=env, ): @@ -358,8 +352,7 @@ def test_preload_remote_module(loop, tmp_path): str(tmp_path / "scheduler-file.json"), "--preload", f"http://localhost:{http_server_port}/scheduler_info.py", - "--port", - str(open_port()), + f"--port={open_port()}", ] ) as proc: with Client( diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index e04fd2a1a92..a61bc5689fb 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -177,10 +177,8 @@ async def test_nanny_worker_ports(c, s): s.address, "--host", "127.0.0.1", - "--worker-port", - str(worker_port), - "--nanny-port", - str(nanny_port), + f"--worker-port={worker_port}", + f"--nanny-port={nanny_port}", "--no-dashboard", ] ): diff --git a/distributed/cli/tests/test_tls_cli.py b/distributed/cli/tests/test_tls_cli.py index a7dfbc45dcc..dba518231ab 100644 --- a/distributed/cli/tests/test_tls_cli.py +++ b/distributed/cli/tests/test_tls_cli.py @@ -45,8 +45,7 @@ def test_nanny(loop): [ "dask-scheduler", "--no-dashboard", - "--port", - str(port), + f"--port={port}", ] + tls_args ) as s: @@ -66,8 +65,7 @@ def test_separate_key_cert(loop): [ "dask-scheduler", "--no-dashboard", - "--port", - str(port), + f"--port={port}", ] + tls_args_2 ) as s: diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index ebd9d335990..860b936240d 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -286,8 +286,7 @@ def test_queue_in_task(loop): [ "dask-scheduler", "--no-dashboard", - "--port", - str(port), + f"--port={port}", ] ): with popen(["dask-worker", f"127.0.0.1:{port}"]): diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 96dfeee7f1c..b1ad5dba87f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -417,7 +417,7 @@ class MyPlugin: def setup(self, worker=None): raise ValueError("Setup failed") - async with Scheduler(port=0) as s: + async with Scheduler(port=0, dashboard_address=":0") as s: with raises_with_cause( RuntimeError, "Worker failed to start", ValueError, "Setup failed" ): @@ -440,7 +440,7 @@ class MyPlugin2: def setup(self, worker=None): raise RuntimeError("MyPlugin2 Error") - async with Scheduler(port=0) as s: + async with Scheduler(port=0, dashboard_address=":0") as s: # There's no guarantee on the order of which exception is raised first with raises_with_cause( RuntimeError, diff --git a/distributed/utils.py b/distributed/utils.py index a48dc8bc51e..1d4bdbf9ecf 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import contextlib import contextvars import functools import importlib @@ -1059,7 +1058,7 @@ def open_port(host: str = "") -> int: after returning from this function. """ # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python - with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((host, 0)) s.listen(1) port = s.getsockname()[1] diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 054a5244a25..f14eef1b041 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2442,12 +2442,7 @@ def requires_default_ports(name_of_test): @contextmanager def _bind_port(port): - with contextlib.closing( - socket.socket( - socket.AF_INET, - socket.SOCK_STREAM, - ) - ) as s: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(("", port)) s.listen(1) From 56b007365f514e0a9859d1e787bb7e813b1e75aa Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 19:05:41 +0200 Subject: [PATCH 10/12] Add fix for test_signal_handling --- distributed/cli/tests/test_dask_scheduler.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 43b65bee418..cbd00c41427 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -530,13 +530,20 @@ def test_multiple_workers(loop): @pytest.mark.skipif(WINDOWS, reason="POSIX only") @pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM]) def test_signal_handling(loop, sig): + port = open_port() with subprocess.Popen( - ["python", "-m", "distributed.cli.dask_scheduler"], + [ + "python", + "-m", + "distributed.cli.dask_scheduler", + f"--port={port}", + "--dashboard-address=:0", + ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) as scheduler: # Wait for scheduler to start - with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c: + with Client(f"127.0.0.1:{port}", loop=loop) as c: pass scheduler.send_signal(sig) stdout, stderr = scheduler.communicate() From edbaac51381c792468676724416d13bd03933e12 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 22 Jun 2022 19:54:53 +0200 Subject: [PATCH 11/12] fix linting --- distributed/cli/tests/test_dask_worker.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index a61bc5689fb..f6f1ac22c68 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -19,13 +19,7 @@ from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import open_port -from distributed.utils_test import ( - gen_cluster, - inc, - popen, - requires_ipv6, - wait_for_log_line, -) +from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line @pytest.mark.parametrize( From 9620479ed276e8d5dd4c247a8c3938b2990b6c15 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 23 Jun 2022 10:18:35 +0200 Subject: [PATCH 12/12] Fix test_listen_address_ipv6 --- distributed/cli/tests/test_dask_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index f6f1ac22c68..d59a7ae25fb 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -456,11 +456,11 @@ def func(dask_worker): @pytest.mark.slow @requires_ipv6 @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -@pytest.mark.parametrize("listen_address", ["tcp://:39838", "tcp://[::1]:39838"]) +@pytest.mark.parametrize("listen_address", ["tcp://:", "tcp://[::1]:"]) @gen_cluster(client=True, nthreads=[]) async def test_listen_address_ipv6(c, s, nanny, listen_address): port = open_port() - listen_address += port + listen_address += str(port) with popen( [ "dask-worker",