diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index f9e15607386..d73905401ff 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -63,13 +63,13 @@ def test_no_dashboard(loop): def test_dashboard(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler"]) as proc: - for line in proc.stderr: + with popen(["dask-scheduler"], flush_output=False) as proc: + for line in proc.stdout: if b"dashboard at" in line: dashboard_port = int(line.decode().split(":")[-1].strip()) break else: - raise Exception("dashboard not found") + assert False # pragma: nocover with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): pass @@ -217,12 +217,17 @@ def test_scheduler_port_zero(loop): def test_dashboard_port_zero(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler", "--dashboard-address", ":0"]) as proc: - for line in proc.stderr: + with popen( + ["dask-scheduler", "--dashboard-address", ":0"], + flush_output=False, + ) as proc: + for line in proc.stdout: if b"dashboard at" in line: dashboard_port = int(line.decode().split(":")[-1].strip()) assert dashboard_port != 0 break + else: + assert False # pragma: nocover PRELOAD_TEXT = """ diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 45f88c894b3..06a9de702ac 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -80,13 +80,17 @@ def test_errors(): '{"foo": "bar"}', "--spec-file", "foo.yaml", - ] + ], + flush_output=False, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line assert "--spec" in line and "--spec-file" in line - with popen([sys.executable, "-m", "distributed.cli.dask_spec"]) as proc: + with popen( + [sys.executable, "-m", "distributed.cli.dask_spec"], + flush_output=False, + ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line assert "--spec" in line and "--spec-file" in line diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 055eb754a81..826946d77f8 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -18,23 +18,27 @@ def test_version_option(): assert result.exit_code == 0 +@pytest.mark.slow def test_ssh_cli_nprocs_renamed_to_nworkers(loop): - n_workers = 2 with popen( - ["dask-ssh", f"--nprocs={n_workers}", "--nohost", "localhost"] - ) as cluster: + ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], + flush_output=False, + ) as proc: with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c: - c.wait_for_workers(n_workers, timeout="15 seconds") + c.wait_for_workers(2, timeout="15 seconds") # This interrupt is necessary for the cluster to place output into the stdout # and stderr pipes - cluster.send_signal(2) - _, stderr = cluster.communicate() - - assert any(b"renamed to --nworkers" in l for l in stderr.splitlines()) + proc.send_signal(2) + assert any( + b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15) + ) def test_ssh_cli_nworkers_with_nprocs_is_an_error(): - with popen(["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"]) as c: + with popen( + ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], + flush_output=False, + ) as proc: assert any( - b"Both --nprocs and --nworkers" in c.stderr.readline() for i in range(15) + b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15) ) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 04e31847609..e38b4d70be0 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -1,15 +1,10 @@ import asyncio - -import pytest -from click.testing import CliRunner - -pytest.importorskip("requests") - import os from multiprocessing import cpu_count from time import sleep -import requests +import pytest +from click.testing import CliRunner from dask.utils import tmpfile @@ -18,127 +13,99 @@ from distributed.compatibility import LINUX, to_thread from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time -from distributed.utils import parse_ports, sync from distributed.utils_test import gen_cluster, popen, requires_ipv6 -def test_nanny_worker_ports(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--host", - "127.0.0.1", - "--worker-port", - "9684", - "--nanny-port", - "5273", - "--no-dashboard", - ] - ): - with Client("127.0.0.1:9359", loop=loop) as c: - start = time() - while True: - d = sync(c.loop, c.scheduler.identity) - if d["workers"]: - break - else: - assert time() - start < 60 - sleep(0.1) - assert ( - d["workers"]["tcp://127.0.0.1:9684"]["nanny"] - == "tcp://127.0.0.1:5273" - ) +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_worker_ports(c, s): + with popen( + [ + "dask-worker", + s.address, + "--host", + "127.0.0.1", + "--worker-port", + "9684", + "--nanny-port", + "5273", + "--no-dashboard", + ] + ): + await c.wait_for_workers(1) + d = await c.scheduler.identity() + assert d["workers"]["tcp://127.0.0.1:9684"]["nanny"] == "tcp://127.0.0.1:5273" @pytest.mark.slow -def test_nanny_worker_port_range(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched: - n_workers = 3 - worker_port = "9684:9686" - nanny_port = "9688:9690" - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--nworkers", - f"{n_workers}", - "--host", - "127.0.0.1", - "--worker-port", - worker_port, - "--nanny-port", - nanny_port, - "--no-dashboard", - ] - ): - with Client("127.0.0.1:9359", loop=loop) as c: - start = time() - while len(c.scheduler_info()["workers"]) < n_workers: - sleep(0.1) - assert time() - start < 60 - - def get_port(dask_worker): - return dask_worker.port - - expected_worker_ports = set(parse_ports(worker_port)) - worker_ports = c.run(get_port) - assert set(worker_ports.values()) == expected_worker_ports - - expected_nanny_ports = set(parse_ports(nanny_port)) - nanny_ports = c.run(get_port, nanny=True) - assert set(nanny_ports.values()) == expected_nanny_ports - - -def test_nanny_worker_port_range_too_many_workers_raises(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--nworkers", - "3", - "--host", - "127.0.0.1", - "--worker-port", - "9684:9685", - "--nanny-port", - "9686:9687", - "--no-dashboard", - ] - ) as worker: - assert any( - b"Could not start" in worker.stderr.readline() for _ in range(100) - ) - - -def test_memory_limit(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - "--memory-limit", - "2e3MB", - "--no-dashboard", - ] - ): - with Client("127.0.0.1:8786", loop=loop) as c: - while not c.nthreads(): - sleep(0.1) - info = c.scheduler_info() - [d] = info["workers"].values() - assert isinstance(d["memory_limit"], int) - assert d["memory_limit"] == 2e9 +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_worker_port_range(c, s): + with popen( + [ + "dask-worker", + s.address, + "--nworkers", + "3", + "--host", + "127.0.0.1", + "--worker-port", + "9684:9686", + "--nanny-port", + "9688:9690", + "--no-dashboard", + ] + ): + await c.wait_for_workers(3) + worker_ports = await c.run(lambda dask_worker: dask_worker.port) + assert set(worker_ports.values()) == {9684, 9685, 9686} + nanny_ports = await c.run(lambda dask_worker: dask_worker.port, nanny=True) + assert set(nanny_ports.values()) == {9688, 9689, 9690} -def test_no_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--no-nanny", "--no-dashboard"] - ) as worker: - assert any(b"Registered" in worker.stderr.readline() for i in range(15)) +@gen_cluster(nthreads=[]) +async def test_nanny_worker_port_range_too_many_workers_raises(s): + with popen( + [ + "dask-worker", + s.address, + "--nworkers", + "3", + "--host", + "127.0.0.1", + "--worker-port", + "9684:9685", + "--nanny-port", + "9686:9687", + "--no-dashboard", + ], + flush_output=False, + ) as worker: + assert any(b"Could not start" in worker.stdout.readline() for _ in range(100)) + + +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_memory_limit(c, s): + with popen( + [ + "dask-worker", + s.address, + "--memory-limit", + "2e3MB", + "--no-dashboard", + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert isinstance(d["memory_limit"], int) + assert d["memory_limit"] == 2e9 + + +@gen_cluster(client=True, nthreads=[]) +async def test_no_nanny(c, s): + with popen(["dask-worker", s.address, "--no-nanny", "--no-dashboard"]): + await c.wait_for_workers(1) @pytest.mark.slow @@ -161,7 +128,7 @@ async def test_no_reconnect(c, s, nanny): comm.abort() # worker terminates as soon as the connection is aborted - await to_thread(worker.communicate, timeout=5) + await to_thread(worker.wait, timeout=5) assert worker.returncode == 0 @@ -179,7 +146,7 @@ async def test_reconnect(c, s, nanny): ] ) as worker: # roundtrip works - await c.submit(lambda x: x + 1, 10) == 11 + assert await c.submit(lambda x: x + 1, 10) == 11 (comm,) = s.stream_comms.values() comm.abort() @@ -189,53 +156,49 @@ async def test_reconnect(c, s, nanny): # closing the scheduler cleanly does terminate the worker await s.close() - await to_thread(worker.communicate, timeout=5) + await to_thread(worker.wait, timeout=5) assert worker.returncode == 0 -def test_resources(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "tcp://127.0.0.1:8786", - "--no-dashboard", - "--resources", - "A=1 B=2,C=3", - ] - ): - with Client("127.0.0.1:8786", loop=loop) as c: - while not c.scheduler_info()["workers"]: - sleep(0.1) - info = c.scheduler_info() - worker = list(info["workers"].values())[0] - assert worker["resources"] == {"A": 1, "B": 2, "C": 3} +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_resources(c, s): + with popen( + [ + "dask-worker", + s.address, + "--no-dashboard", + "--resources", + "A=1 B=2,C=3", + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert d["resources"] == {"A": 1, "B": 2, "C": 3} +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -def test_local_directory(loop, nanny): - with tmpfile() as fn: - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--local-directory", - fn, - ] - ): - with Client("127.0.0.1:8786", loop=loop, timeout=10) as c: - start = time() - while not c.scheduler_info()["workers"]: - sleep(0.1) - assert time() < start + 8 - info = c.scheduler_info() - worker = list(info["workers"].values())[0] - assert worker["local_directory"].startswith(fn) +@gen_cluster(client=True, nthreads=[]) +async def test_local_directory(c, s, nanny, tmpdir): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--local-directory", + str(tmpdir), + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert d["local_directory"].startswith(str(tmpdir)) +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_scheduler_file(loop, nanny): with tmpfile() as fn: @@ -250,6 +213,7 @@ def test_scheduler_file(loop, nanny): assert time() < start + 10 +@pytest.mark.slow def test_scheduler_address_env(loop, monkeypatch): monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:8786") with popen(["dask-scheduler", "--no-dashboard"]): @@ -261,166 +225,165 @@ def test_scheduler_address_env(loop, monkeypatch): assert time() < start + 10 -def test_nworkers_requires_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--nworkers=2", "--no-nanny"] - ) as worker: - assert any( - b"Failed to launch worker" in worker.stderr.readline() - for i in range(15) - ) +@gen_cluster(nthreads=[]) +async def test_nworkers_requires_nanny(s): + with popen( + ["dask-worker", s.address, "--nworkers=2", "--no-nanny"], + flush_output=False, + ) as worker: + assert any( + b"Failed to launch worker" in worker.stdout.readline() for _ in range(15) + ) -def test_nworkers_negative(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers=-1"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - c.wait_for_workers(cpu_count(), timeout="10 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_negative(c, s): + with popen(["dask-worker", s.address, "--nworkers=-1"]): + await c.wait_for_workers(cpu_count()) -def test_nworkers_auto(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers=auto"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - procs, _ = nprocesses_nthreads() - c.wait_for_workers(procs, timeout="10 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_auto(c, s): + with popen(["dask-worker", s.address, "--nworkers=auto"]): + procs, _ = nprocesses_nthreads() + await c.wait_for_workers(procs) -def test_nworkers_expands_name(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers", "2", "--name", "0"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers", "2"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - start = time() - while len(c.scheduler_info()["workers"]) < 4: - sleep(0.2) - assert time() < start + 30 +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_expands_name(c, s): + with popen(["dask-worker", s.address, "--nworkers", "2", "--name", "0"]): + with popen(["dask-worker", s.address, "--nworkers", "2"]): + await c.wait_for_workers(4) + info = c.scheduler_info() - info = c.scheduler_info() - names = [d["name"] for d in info["workers"].values()] - foos = [n for n in names if n.startswith("0-")] - assert len(foos) == 2 - assert len(set(names)) == 4 + names = [d["name"] for d in info["workers"].values()] + assert len(names) == len(set(names)) == 4 + zeros = [n for n in names if n.startswith("0-")] + assert len(zeros) == 2 -def test_worker_cli_nprocs_renamed_to_nworkers(loop): - n_workers = 2 - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", f"--nprocs={n_workers}"] - ) as worker: - assert any( - b"renamed to --nworkers" in worker.stderr.readline() for i in range(15) - ) - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - c.wait_for_workers(n_workers, timeout="30 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): + with popen( + ["dask-worker", s.address, "--nprocs=2"], + flush_output=False, + ) as worker: + await c.wait_for_workers(2) + assert any( + b"renamed to --nworkers" in worker.stdout.readline() for _ in range(15) + ) -def test_worker_cli_nworkers_with_nprocs_is_an_error(): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--nprocs=2", "--nworkers=2"] - ) as worker: - assert any( - b"Both --nprocs and --nworkers" in worker.stderr.readline() - for i in range(15) - ) +@gen_cluster(nthreads=[]) +async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): + with popen( + ["dask-worker", s.address, "--nprocs=2", "--nworkers=2"], + flush_output=False, + ) as worker: + assert any( + b"Both --nprocs and --nworkers" in worker.stdout.readline() + for _ in range(15) + ) +@pytest.mark.slow @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize( "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] ) -def test_contact_listen_address(loop, nanny, listen_address): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--contact-address", - "tcp://127.0.0.2:39837", - "--listen-address", - listen_address, - ] - ): - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - info = client.scheduler_info() - assert "tcp://127.0.0.2:39837" in info["workers"] +@gen_cluster(client=True, nthreads=[]) +async def test_contact_listen_address(c, s, nanny, listen_address): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--contact-address", + "tcp://127.0.0.2:39837", + "--listen-address", + listen_address, + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + assert info["workers"].keys() == {"tcp://127.0.0.2:39837"} - # roundtrip works - assert client.submit(lambda x: x + 1, 10).result() == 11 + # roundtrip works + assert await c.submit(lambda x: x + 1, 10) == 11 - def func(dask_worker): - return dask_worker.listener.listen_address + def func(dask_worker): + return dask_worker.listener.listen_address - assert client.run(func) == {"tcp://127.0.0.2:39837": listen_address} + assert await c.run(func) == {"tcp://127.0.0.2:39837": listen_address} +@pytest.mark.slow @requires_ipv6 @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("listen_address", ["tcp://:39838", "tcp://[::1]:39838"]) -def test_listen_address_ipv6(loop, nanny, listen_address): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--listen-address", - listen_address, - ] - ): - # IPv4 used by default for name of global listener; IPv6 used by default when - # listening only on IPv6. - bind_all = "[::1]" not in listen_address - expected_ip = "127.0.0.1" if bind_all else "[::1]" - expected_name = f"tcp://{expected_ip}:39838" - expected_listen = "tcp://0.0.0.0:39838" if bind_all else listen_address - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - info = client.scheduler_info() - assert expected_name in info["workers"] - assert client.submit(lambda x: x + 1, 10).result() == 11 +@gen_cluster(client=True, nthreads=[]) +async def test_listen_address_ipv6(c, s, nanny, listen_address): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--listen-address", + listen_address, + ] + ): + # IPv4 used by default for name of global listener; IPv6 used by default when + # listening only on IPv6. + bind_all = "[::1]" not in listen_address + expected_ip = "127.0.0.1" if bind_all else "[::1]" + expected_name = f"tcp://{expected_ip}:39838" + expected_listen = "tcp://0.0.0.0:39838" if bind_all else listen_address - def func(dask_worker): - return dask_worker.listener.listen_address + await c.wait_for_workers(1) + info = c.scheduler_info() + assert info["workers"].keys() == {expected_name} + assert await c.submit(lambda x: x + 1, 10) == 11 + + def func(dask_worker): + return dask_worker.listener.listen_address - assert client.run(func) == {expected_name: expected_listen} + assert await c.run(func) == {expected_name: expected_listen} +@pytest.mark.slow @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"]) -def test_respect_host_listen_address(loop, nanny, host): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", nanny, "--no-dashboard", "--host", host] - ) as worker: - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - client.scheduler_info() +@gen_cluster(client=True, nthreads=[]) +async def test_respect_host_listen_address(c, s, nanny, host): + with popen(["dask-worker", s.address, nanny, "--no-dashboard", "--host", host]): + await c.wait_for_workers(1) - # roundtrip works - assert client.submit(lambda x: x + 1, 10).result() == 11 + # roundtrip works + assert await c.submit(lambda x: x + 1, 10) == 11 - def func(dask_worker): - return dask_worker.listener.listen_address + def func(dask_worker): + return dask_worker.listener.listen_address - listen_addresses = client.run(func) - assert all(host in v for v in listen_addresses.values()) + listen_addresses = await c.run(func) + assert all(host in v for v in listen_addresses.values()) -def test_dashboard_non_standard_ports(loop): +@pytest.mark.slow +@gen_cluster( + client=True, nthreads=[], scheduler_kwargs={"dashboard_address": "localhost:8787"} +) +async def test_dashboard_non_standard_ports(c, s): pytest.importorskip("bokeh") + requests = pytest.importorskip("requests") + try: import jupyter_server_proxy # noqa: F401 @@ -428,33 +391,27 @@ def test_dashboard_non_standard_ports(loop): except ImportError: proxy_exists = False - with popen(["dask-scheduler", "--port", "3449"]): - with popen( - [ - "dask-worker", - "tcp://127.0.0.1:3449", - "--dashboard-address", - ":4833", - "--host", - "127.0.0.1", - ] - ): - with Client("127.0.0.1:3449", loop=loop) as c: - c.wait_for_workers(1) - pass - - response = requests.get("http://127.0.0.1:4833/status") - assert response.ok - redirect_resp = requests.get("http://127.0.0.1:4833/main") - redirect_resp.ok - # TEST PROXYING WORKS - if proxy_exists: - url = "http://127.0.0.1:8787/proxy/4833/127.0.0.1/status" - response = requests.get(url) - assert response.ok - - with pytest.raises(Exception): - requests.get("http://localhost:4833/status/") + with popen( + [ + "dask-worker", + s.address, + "--dashboard-address", + ":4833", + "--host", + "127.0.0.1", + ] + ): + await c.wait_for_workers(1) + + response = requests.get("http://127.0.0.1:4833/status") + response.raise_for_status() + # TEST PROXYING WORKS + if proxy_exists: + response = requests.get("http://127.0.0.1:8787/proxy/4833/127.0.0.1/status") + response.raise_for_status() + + with pytest.raises(requests.ConnectionError): + requests.get("http://localhost:4833/status/") def test_version_option(): @@ -493,6 +450,7 @@ def test_bokeh_deprecation(): pass +@pytest.mark.slow @gen_cluster(nthreads=[]) async def test_integer_names(s): with popen(["dask-worker", s.address, "--name", "123"]): @@ -502,6 +460,7 @@ async def test_integer_names(s): assert ws.name == 123 +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_worker_class(c, s, tmp_path, nanny): @@ -543,6 +502,7 @@ def worker_type(dask_worker): assert all(name == "MyWorker" for name in worker_types.values()) +@pytest.mark.slow @gen_cluster(nthreads=[], client=True) async def test_preload_config(c, s): # Ensure dask-worker pulls the preload from the Dask config if @@ -553,13 +513,7 @@ def dask_setup(worker): """ env = os.environ.copy() env["DASK_DISTRIBUTED__WORKER__PRELOAD"] = preload_text - with popen( - [ - "dask-worker", - s.address, - ], - env=env, - ): + with popen(["dask-worker", s.address], env=env): await c.wait_for_workers(1) [foo] = (await c.run(lambda dask_worker: dask_worker.foo)).values() assert foo == "setup" diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e7af52c8997..0001c5f9b54 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import copy import functools import gc @@ -1172,9 +1173,27 @@ def _terminate_process(proc): @contextmanager -def popen(args, **kwargs): +def popen(args: list[str], flush_output: bool = True, **kwargs): + """Start a shell command in a subprocess. + Yields a subprocess.Popen object. + + stderr is redirected to stdout. + stdout is redirected to a pipe. + + Parameters + ---------- + args: list[str] + Command line arguments + flush_output: bool, optional + If True (the default), the stdout/stderr pipe is emptied while it is being + filled. Set to False if you wish to read the output yourself. Note that setting + this to False and then failing to periodically read from the pipe may result in + a deadlock due to the pipe getting full. + kwargs: optional + optional arguments to subprocess.Popen + """ kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.PIPE + kwargs["stderr"] = subprocess.STDOUT if sys.platform.startswith("win"): # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP @@ -1188,9 +1207,16 @@ def popen(args, **kwargs): os.environ.get("DESTDIR", "") + sys.prefix, "bin", args[0] ) proc = subprocess.Popen(args, **kwargs) + + if flush_output: + ex = concurrent.futures.ThreadPoolExecutor(1) + flush_future = ex.submit(proc.communicate) + try: yield proc - except Exception: + + # asyncio.CancelledError is raised by @gen_test/@gen_cluster timeout + except (Exception, asyncio.CancelledError): dump_stdout = True raise @@ -1199,13 +1225,17 @@ def popen(args, **kwargs): _terminate_process(proc) finally: # XXX Also dump stdout if return code != 0 ? - out, err = proc.communicate() - if dump_stdout: - print("\n\nPrint from stderr\n %s\n=================\n" % args[0][0]) - print(err.decode()) + if flush_output: + out, err = flush_future.result() + ex.shutdown() + else: + out, err = proc.communicate() + assert not err - print("\n\nPrint from stdout\n=================\n") - print(out.decode()) + if dump_stdout: + print("\n" + "-" * 27 + " Subprocess stdout/stderr" + "-" * 27) + print(out.decode().rstrip()) + print("-" * 80) def wait_for(predicate, timeout, fail_func=None, period=0.001):