From 68fbbcccd851d9763fbcf3215892f88e8a6ea2b4 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 17 Mar 2022 17:32:27 +0000 Subject: [PATCH 1/7] Overhaul test_dask_worker + fix popen logging --- .github/workflows/tests.yaml | 19 +- distributed/cli/tests/test_dask_worker.py | 570 ++++++++++------------ distributed/utils_test.py | 4 +- 3 files changed, 265 insertions(+), 328 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e087f99757..8e554e18aab 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -22,16 +22,8 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ["3.8", "3.9"] - # Cherry-pick test modules to split the overall runtime roughly in half - partition: [ci1, not ci1] - include: - - partition: "ci1" - partition-label: "ci1" - - partition: "not ci1" - partition-label: "notci1" - + os: [ubuntu-latest, windows-latest] + python-version: ["3.8"] # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. # This will take a LONG time and delay all PRs across the whole github.com/dask! @@ -41,8 +33,7 @@ jobs: # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} - # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }} steps: - name: Checkout source @@ -129,8 +120,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed \ - -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ + pytest distributed/cli/tests/test_dask_worker.py \ + -m "not avoid_ci" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 04e31847609..719f4d121ff 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -1,15 +1,10 @@ import asyncio - -import pytest -from click.testing import CliRunner - -pytest.importorskip("requests") - import os from multiprocessing import cpu_count from time import sleep -import requests +import pytest +from click.testing import CliRunner from dask.utils import tmpfile @@ -18,127 +13,98 @@ from distributed.compatibility import LINUX, to_thread from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time -from distributed.utils import parse_ports, sync from distributed.utils_test import gen_cluster, popen, requires_ipv6 -def test_nanny_worker_ports(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--host", - "127.0.0.1", - "--worker-port", - "9684", - "--nanny-port", - "5273", - "--no-dashboard", - ] - ): - with Client("127.0.0.1:9359", loop=loop) as c: - start = time() - while True: - d = sync(c.loop, c.scheduler.identity) - if d["workers"]: - break - else: - assert time() - start < 60 - sleep(0.1) - assert ( - d["workers"]["tcp://127.0.0.1:9684"]["nanny"] - == "tcp://127.0.0.1:5273" - ) +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_worker_ports(c, s): + with popen( + [ + "dask-worker", + s.address, + "--host", + "127.0.0.1", + "--worker-port", + "9684", + "--nanny-port", + "5273", + "--no-dashboard", + ] + ): + await c.wait_for_workers(1) + d = await c.scheduler.identity() + assert d["workers"]["tcp://127.0.0.1:9684"]["nanny"] == "tcp://127.0.0.1:5273" @pytest.mark.slow -def test_nanny_worker_port_range(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched: - n_workers = 3 - worker_port = "9684:9686" - nanny_port = "9688:9690" - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--nworkers", - f"{n_workers}", - "--host", - "127.0.0.1", - "--worker-port", - worker_port, - "--nanny-port", - nanny_port, - "--no-dashboard", - ] - ): - with Client("127.0.0.1:9359", loop=loop) as c: - start = time() - while len(c.scheduler_info()["workers"]) < n_workers: - sleep(0.1) - assert time() - start < 60 - - def get_port(dask_worker): - return dask_worker.port - - expected_worker_ports = set(parse_ports(worker_port)) - worker_ports = c.run(get_port) - assert set(worker_ports.values()) == expected_worker_ports - - expected_nanny_ports = set(parse_ports(nanny_port)) - nanny_ports = c.run(get_port, nanny=True) - assert set(nanny_ports.values()) == expected_nanny_ports - - -def test_nanny_worker_port_range_too_many_workers_raises(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:9359", - "--nworkers", - "3", - "--host", - "127.0.0.1", - "--worker-port", - "9684:9685", - "--nanny-port", - "9686:9687", - "--no-dashboard", - ] - ) as worker: - assert any( - b"Could not start" in worker.stderr.readline() for _ in range(100) - ) - - -def test_memory_limit(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - "--memory-limit", - "2e3MB", - "--no-dashboard", - ] - ): - with Client("127.0.0.1:8786", loop=loop) as c: - while not c.nthreads(): - sleep(0.1) - info = c.scheduler_info() - [d] = info["workers"].values() - assert isinstance(d["memory_limit"], int) - assert d["memory_limit"] == 2e9 +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_worker_port_range(c, s): + with popen( + [ + "dask-worker", + s.address, + "--nworkers", + "3", + "--host", + "127.0.0.1", + "--worker-port", + "9684:9686", + "--nanny-port", + "9688:9690", + "--no-dashboard", + ] + ): + await c.wait_for_workers(3) + worker_ports = await c.run(lambda dask_worker: dask_worker.port) + assert set(worker_ports.values()) == {9684, 9685, 9686} + nanny_ports = await c.run(lambda dask_worker: dask_worker.port, nanny=True) + assert set(nanny_ports.values()) == {9688, 9689, 9690} -def test_no_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--no-nanny", "--no-dashboard"] - ) as worker: - assert any(b"Registered" in worker.stderr.readline() for i in range(15)) +@gen_cluster(nthreads=[]) +async def test_nanny_worker_port_range_too_many_workers_raises(s): + with popen( + [ + "dask-worker", + s.address, + "--nworkers", + "3", + "--host", + "127.0.0.1", + "--worker-port", + "9684:9685", + "--nanny-port", + "9686:9687", + "--no-dashboard", + ] + ) as worker: + assert any(b"Could not start" in worker.stderr.readline() for _ in range(100)) + + +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_memory_limit(c, s): + with popen( + [ + "dask-worker", + s.address, + "--memory-limit", + "2e3MB", + "--no-dashboard", + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert isinstance(d["memory_limit"], int) + assert d["memory_limit"] == 2e9 + + +@gen_cluster(client=True, nthreads=[]) +async def test_no_nanny(c, s): + with popen(["dask-worker", s.address, "--no-nanny", "--no-dashboard"]): + await c.wait_for_workers(1) @pytest.mark.slow @@ -193,49 +159,45 @@ async def test_reconnect(c, s, nanny): assert worker.returncode == 0 -def test_resources(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "tcp://127.0.0.1:8786", - "--no-dashboard", - "--resources", - "A=1 B=2,C=3", - ] - ): - with Client("127.0.0.1:8786", loop=loop) as c: - while not c.scheduler_info()["workers"]: - sleep(0.1) - info = c.scheduler_info() - worker = list(info["workers"].values())[0] - assert worker["resources"] == {"A": 1, "B": 2, "C": 3} +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_resources(c, s): + with popen( + [ + "dask-worker", + s.address, + "--no-dashboard", + "--resources", + "A=1 B=2,C=3", + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert d["resources"] == {"A": 1, "B": 2, "C": 3} +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) -def test_local_directory(loop, nanny): - with tmpfile() as fn: - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--local-directory", - fn, - ] - ): - with Client("127.0.0.1:8786", loop=loop, timeout=10) as c: - start = time() - while not c.scheduler_info()["workers"]: - sleep(0.1) - assert time() < start + 8 - info = c.scheduler_info() - worker = list(info["workers"].values())[0] - assert worker["local_directory"].startswith(fn) +@gen_cluster(client=True, nthreads=[]) +async def test_local_directory(c, s, nanny, tmpdir): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--local-directory", + str(tmpdir), + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + (d,) = info["workers"].values() + assert d["local_directory"].startswith(str(tmpdir)) +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_scheduler_file(loop, nanny): with tmpfile() as fn: @@ -250,6 +212,7 @@ def test_scheduler_file(loop, nanny): assert time() < start + 10 +@pytest.mark.slow def test_scheduler_address_env(loop, monkeypatch): monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:8786") with popen(["dask-scheduler", "--no-dashboard"]): @@ -261,166 +224,156 @@ def test_scheduler_address_env(loop, monkeypatch): assert time() < start + 10 -def test_nworkers_requires_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--nworkers=2", "--no-nanny"] - ) as worker: - assert any( - b"Failed to launch worker" in worker.stderr.readline() - for i in range(15) - ) +@gen_cluster(nthreads=[]) +async def test_nworkers_requires_nanny(s): + with popen(["dask-worker", s.address, "--nworkers=2", "--no-nanny"]) as worker: + assert any( + b"Failed to launch worker" in worker.stderr.readline() for i in range(15) + ) -def test_nworkers_negative(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers=-1"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - c.wait_for_workers(cpu_count(), timeout="10 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_negative(c, s): + with popen(["dask-worker", s.address, "--nworkers=-1"]): + await c.wait_for_workers(cpu_count()) -def test_nworkers_auto(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers=auto"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - procs, _ = nprocesses_nthreads() - c.wait_for_workers(procs, timeout="10 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_auto(c, s): + with popen(["dask-worker", s.address, "--nworkers=auto"]): + procs, _ = nprocesses_nthreads() + await c.wait_for_workers(procs) -def test_nworkers_expands_name(loop): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers", "2", "--name", "0"]): - with popen(["dask-worker", "127.0.0.1:8786", "--nworkers", "2"]): - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - start = time() - while len(c.scheduler_info()["workers"]) < 4: - sleep(0.2) - assert time() < start + 30 +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_nworkers_expands_name(c, s): + with popen(["dask-worker", s.address, "--nworkers", "2", "--name", "0"]): + with popen(["dask-worker", s.address, "--nworkers", "2"]): + await c.wait_for_workers(4) + info = c.scheduler_info() - info = c.scheduler_info() - names = [d["name"] for d in info["workers"].values()] - foos = [n for n in names if n.startswith("0-")] - assert len(foos) == 2 - assert len(set(names)) == 4 + names = [d["name"] for d in info["workers"].values()] + assert len(names) == len(set(names)) == 4 + zeros = [n for n in names if n.startswith("0-")] + assert len(zeros) == 2 -def test_worker_cli_nprocs_renamed_to_nworkers(loop): - n_workers = 2 - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", f"--nprocs={n_workers}"] - ) as worker: - assert any( - b"renamed to --nworkers" in worker.stderr.readline() for i in range(15) - ) - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - c.wait_for_workers(n_workers, timeout="30 seconds") +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[]) +async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): + with popen(["dask-worker", s.address, "--nprocs=2"]) as worker: + await c.wait_for_workers(2) + assert any( + b"renamed to --nworkers" in worker.stderr.readline() for _ in range(15) + ) -def test_worker_cli_nworkers_with_nprocs_is_an_error(): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", "--nprocs=2", "--nworkers=2"] - ) as worker: - assert any( - b"Both --nprocs and --nworkers" in worker.stderr.readline() - for i in range(15) - ) +@gen_cluster(nthreads=[]) +async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): + with popen(["dask-worker", s.address, "--nprocs=2", "--nworkers=2"]) as worker: + assert any( + b"Both --nprocs and --nworkers" in worker.stderr.readline() + for _ in range(15) + ) +@pytest.mark.slow @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize( "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] ) -def test_contact_listen_address(loop, nanny, listen_address): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--contact-address", - "tcp://127.0.0.2:39837", - "--listen-address", - listen_address, - ] - ): - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - info = client.scheduler_info() - assert "tcp://127.0.0.2:39837" in info["workers"] +@gen_cluster(client=True, nthreads=[]) +async def test_contact_listen_address(c, s, nanny, listen_address): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--contact-address", + "tcp://127.0.0.2:39837", + "--listen-address", + listen_address, + ] + ): + await c.wait_for_workers(1) + info = c.scheduler_info() + assert info["workers"].keys() == {"tcp://127.0.0.2:39837"} - # roundtrip works - assert client.submit(lambda x: x + 1, 10).result() == 11 + # roundtrip works + assert await c.submit(lambda x: x + 1, 10) == 11 - def func(dask_worker): - return dask_worker.listener.listen_address + def func(dask_worker): + return dask_worker.listener.listen_address - assert client.run(func) == {"tcp://127.0.0.2:39837": listen_address} + assert await c.run(func) == {"tcp://127.0.0.2:39837": listen_address} +@pytest.mark.slow @requires_ipv6 @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("listen_address", ["tcp://:39838", "tcp://[::1]:39838"]) -def test_listen_address_ipv6(loop, nanny, listen_address): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - [ - "dask-worker", - "127.0.0.1:8786", - nanny, - "--no-dashboard", - "--listen-address", - listen_address, - ] - ): - # IPv4 used by default for name of global listener; IPv6 used by default when - # listening only on IPv6. - bind_all = "[::1]" not in listen_address - expected_ip = "127.0.0.1" if bind_all else "[::1]" - expected_name = f"tcp://{expected_ip}:39838" - expected_listen = "tcp://0.0.0.0:39838" if bind_all else listen_address - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - info = client.scheduler_info() - assert expected_name in info["workers"] - assert client.submit(lambda x: x + 1, 10).result() == 11 +@gen_cluster(client=True, nthreads=[]) +async def test_listen_address_ipv6(c, s, nanny, listen_address): + with popen( + [ + "dask-worker", + s.address, + nanny, + "--no-dashboard", + "--listen-address", + listen_address, + ] + ): + # IPv4 used by default for name of global listener; IPv6 used by default when + # listening only on IPv6. + bind_all = "[::1]" not in listen_address + expected_ip = "127.0.0.1" if bind_all else "[::1]" + expected_name = f"tcp://{expected_ip}:39838" + expected_listen = "tcp://0.0.0.0:39838" if bind_all else listen_address - def func(dask_worker): - return dask_worker.listener.listen_address + await c.wait_for_workers(1) + info = c.scheduler_info() + assert info["workers"].keys() == {expected_name} + assert await c.submit(lambda x: x + 1, 10) == 11 - assert client.run(func) == {expected_name: expected_listen} + def func(dask_worker): + return dask_worker.listener.listen_address + assert await c.run(func) == {expected_name: expected_listen} + +@pytest.mark.slow @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"]) -def test_respect_host_listen_address(loop, nanny, host): - with popen(["dask-scheduler", "--no-dashboard"]): - with popen( - ["dask-worker", "127.0.0.1:8786", nanny, "--no-dashboard", "--host", host] - ) as worker: - with Client("127.0.0.1:8786") as client: - while not client.nthreads(): - sleep(0.1) - client.scheduler_info() +@gen_cluster(client=True, nthreads=[]) +async def test_respect_host_listen_address(c, s, nanny, host): + with popen(["dask-worker", s.address, nanny, "--no-dashboard", "--host", host]): + await c.wait_for_workers(1) - # roundtrip works - assert client.submit(lambda x: x + 1, 10).result() == 11 + # roundtrip works + assert await c.submit(lambda x: x + 1, 10) == 11 - def func(dask_worker): - return dask_worker.listener.listen_address + def func(dask_worker): + return dask_worker.listener.listen_address - listen_addresses = client.run(func) - assert all(host in v for v in listen_addresses.values()) + listen_addresses = await c.run(func) + assert all(host in v for v in listen_addresses.values()) -def test_dashboard_non_standard_ports(loop): +@pytest.mark.slow +@gen_cluster( + client=True, nthreads=[], scheduler_kwargs={"dashboard_address": "localhost:8787"} +) +async def test_dashboard_non_standard_ports(c, s): pytest.importorskip("bokeh") + requests = pytest.importorskip("requests") + try: import jupyter_server_proxy # noqa: F401 @@ -428,33 +381,27 @@ def test_dashboard_non_standard_ports(loop): except ImportError: proxy_exists = False - with popen(["dask-scheduler", "--port", "3449"]): - with popen( - [ - "dask-worker", - "tcp://127.0.0.1:3449", - "--dashboard-address", - ":4833", - "--host", - "127.0.0.1", - ] - ): - with Client("127.0.0.1:3449", loop=loop) as c: - c.wait_for_workers(1) - pass - - response = requests.get("http://127.0.0.1:4833/status") - assert response.ok - redirect_resp = requests.get("http://127.0.0.1:4833/main") - redirect_resp.ok - # TEST PROXYING WORKS - if proxy_exists: - url = "http://127.0.0.1:8787/proxy/4833/127.0.0.1/status" - response = requests.get(url) - assert response.ok - - with pytest.raises(Exception): - requests.get("http://localhost:4833/status/") + with popen( + [ + "dask-worker", + s.address, + "--dashboard-address", + ":4833", + "--host", + "127.0.0.1", + ] + ): + await c.wait_for_workers(1) + + response = requests.get("http://127.0.0.1:4833/status") + response.raise_for_status() + # TEST PROXYING WORKS + if proxy_exists: + response = requests.get("http://127.0.0.1:8787/proxy/4833/127.0.0.1/status") + response.raise_for_status() + + with pytest.raises(requests.ConnectionError): + requests.get("http://localhost:4833/status/") def test_version_option(): @@ -493,6 +440,7 @@ def test_bokeh_deprecation(): pass +@pytest.mark.slow @gen_cluster(nthreads=[]) async def test_integer_names(s): with popen(["dask-worker", s.address, "--name", "123"]): @@ -502,6 +450,7 @@ async def test_integer_names(s): assert ws.name == 123 +@pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @gen_cluster(client=True, nthreads=[]) async def test_worker_class(c, s, tmp_path, nanny): @@ -543,6 +492,7 @@ def worker_type(dask_worker): assert all(name == "MyWorker" for name in worker_types.values()) +@pytest.mark.slow @gen_cluster(nthreads=[], client=True) async def test_preload_config(c, s): # Ensure dask-worker pulls the preload from the Dask config if @@ -553,13 +503,7 @@ def dask_setup(worker): """ env = os.environ.copy() env["DASK_DISTRIBUTED__WORKER__PRELOAD"] = preload_text - with popen( - [ - "dask-worker", - s.address, - ], - env=env, - ): + with popen(["dask-worker", s.address], env=env): await c.wait_for_workers(1) [foo] = (await c.run(lambda dask_worker: dask_worker.foo)).values() assert foo == "setup" diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 3a4a31e7263..ca2fed68b66 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1191,7 +1191,9 @@ def popen(args, **kwargs): proc = subprocess.Popen(args, **kwargs) try: yield proc - except Exception: + + # asyncio.CancelledError is raised by @gen_test/@gen_cluster timeout + except (Exception, asyncio.CancelledError): dump_stdout = True raise From 0a28871142b1a9b5c8c2ea37891b63882732253e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 15:08:44 +0000 Subject: [PATCH 2/7] Asynchronously flush stdout/stderr --- distributed/cli/tests/test_dask_spec.py | 8 +++-- distributed/cli/tests/test_dask_ssh.py | 25 ++++++------- distributed/cli/tests/test_dask_worker.py | 34 ++++++++++++------ distributed/utils_test.py | 44 ++++++++++++++++++----- 4 files changed, 78 insertions(+), 33 deletions(-) diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 45f88c894b3..06a9de702ac 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -80,13 +80,17 @@ def test_errors(): '{"foo": "bar"}', "--spec-file", "foo.yaml", - ] + ], + flush_output=False, ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line assert "--spec" in line and "--spec-file" in line - with popen([sys.executable, "-m", "distributed.cli.dask_spec"]) as proc: + with popen( + [sys.executable, "-m", "distributed.cli.dask_spec"], + flush_output=False, + ) as proc: line = proc.stdout.readline().decode() assert "exactly one" in line assert "--spec" in line and "--spec-file" in line diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 055eb754a81..62c85896e55 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -21,20 +21,21 @@ def test_version_option(): def test_ssh_cli_nprocs_renamed_to_nworkers(loop): n_workers = 2 with popen( - ["dask-ssh", f"--nprocs={n_workers}", "--nohost", "localhost"] - ) as cluster: - with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c: - c.wait_for_workers(n_workers, timeout="15 seconds") - # This interrupt is necessary for the cluster to place output into the stdout - # and stderr pipes - cluster.send_signal(2) - _, stderr = cluster.communicate() - - assert any(b"renamed to --nworkers" in l for l in stderr.splitlines()) + ["dask-ssh", f"--nprocs={n_workers}", "--nohost", "localhost"], + flush_output=False, + ) as proc: + with Client("tcp://127.0.0.1:8786", loop=loop) as c: + c.wait_for_workers(n_workers) + assert any( + b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15) + ) def test_ssh_cli_nworkers_with_nprocs_is_an_error(): - with popen(["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"]) as c: + with popen( + ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], + flush_output=False, + ) as proc: assert any( - b"Both --nprocs and --nworkers" in c.stderr.readline() for i in range(15) + b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15) ) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 719f4d121ff..af48c6da1f2 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -38,7 +38,7 @@ async def test_nanny_worker_ports(c, s): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[]) +@gen_cluster(client=True, nthreads=[], timeout=1800) async def test_nanny_worker_port_range(c, s): with popen( [ @@ -77,9 +77,10 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s): "--nanny-port", "9686:9687", "--no-dashboard", - ] + ], + flush_output=False, ) as worker: - assert any(b"Could not start" in worker.stderr.readline() for _ in range(100)) + assert any(b"Could not start" in worker.stdout.readline() for _ in range(100)) @pytest.mark.slow @@ -118,7 +119,8 @@ async def test_no_reconnect(c, s, nanny): "--no-reconnect", nanny, "--no-dashboard", - ] + ], + flush_output=False, ) as worker: # roundtrip works assert await c.submit(lambda x: x + 1, 10) == 11 @@ -142,7 +144,8 @@ async def test_reconnect(c, s, nanny): "--reconnect", nanny, "--no-dashboard", - ] + ], + flush_output=False, ) as worker: # roundtrip works await c.submit(lambda x: x + 1, 10) == 11 @@ -226,9 +229,12 @@ def test_scheduler_address_env(loop, monkeypatch): @gen_cluster(nthreads=[]) async def test_nworkers_requires_nanny(s): - with popen(["dask-worker", s.address, "--nworkers=2", "--no-nanny"]) as worker: + with popen( + ["dask-worker", s.address, "--nworkers=2", "--no-nanny"], + flush_output=False, + ) as worker: assert any( - b"Failed to launch worker" in worker.stderr.readline() for i in range(15) + b"Failed to launch worker" in worker.stdout.readline() for _ in range(15) ) @@ -264,18 +270,24 @@ async def test_nworkers_expands_name(c, s): @pytest.mark.slow @gen_cluster(client=True, nthreads=[]) async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): - with popen(["dask-worker", s.address, "--nprocs=2"]) as worker: + with popen( + ["dask-worker", s.address, "--nprocs=2"], + flush_output=False, + ) as worker: await c.wait_for_workers(2) assert any( - b"renamed to --nworkers" in worker.stderr.readline() for _ in range(15) + b"renamed to --nworkers" in worker.stdout.readline() for _ in range(15) ) @gen_cluster(nthreads=[]) async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): - with popen(["dask-worker", s.address, "--nprocs=2", "--nworkers=2"]) as worker: + with popen( + ["dask-worker", s.address, "--nprocs=2", "--nworkers=2"], + flush_output=False, + ) as worker: assert any( - b"Both --nprocs and --nworkers" in worker.stderr.readline() + b"Both --nprocs and --nworkers" in worker.stdout.readline() for _ in range(15) ) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index ca2fed68b66..e078507218c 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import copy import functools import gc @@ -1173,9 +1174,27 @@ def _terminate_process(proc): @contextmanager -def popen(args, **kwargs): +def popen(args: list[str], flush_output: bool = True, **kwargs): + """Start a shell command in a subprocess. + Yields a subprocess.Popen object. + + stderr is redirected to stdout. + stdout is redirected to a pipe. + + Parameters + ---------- + args: list[str] + Command line arguments + flush_output: bool, optional + If True (the default), the stdout/stderr pipe is emptied while it is being + filled. Set to False if you wish to read the output yourself. Note that setting + this to False and then failing to periodically read from the pipe may result in + a deadlock due to the pipe getting full. + kwargs: optional + optional arguments to subprocess.Popen + """ kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.PIPE + kwargs["stderr"] = subprocess.STDOUT if sys.platform.startswith("win"): # Allow using CTRL_C_EVENT / CTRL_BREAK_EVENT kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP @@ -1189,6 +1208,11 @@ def popen(args, **kwargs): os.environ.get("DESTDIR", "") + sys.prefix, "bin", args[0] ) proc = subprocess.Popen(args, **kwargs) + + if flush_output: + ex = concurrent.futures.ThreadPoolExecutor(1) + flush_future = ex.submit(proc.communicate) + try: yield proc @@ -1202,13 +1226,17 @@ def popen(args, **kwargs): _terminate_process(proc) finally: # XXX Also dump stdout if return code != 0 ? - out, err = proc.communicate() - if dump_stdout: - print("\n\nPrint from stderr\n %s\n=================\n" % args[0][0]) - print(err.decode()) + if flush_output: + out, err = flush_future.result() + ex.shutdown() + else: + out, err = proc.communicate() + assert not err - print("\n\nPrint from stdout\n=================\n") - print(out.decode()) + if dump_stdout: + print("\n" + "-" * 27 + " Subprocess stdout/stderr" + "-" * 27) + print(out.decode().rstrip()) + print("-" * 80) def wait_for(predicate, timeout, fail_func=None, period=0.001): From 8a04f051bcbd80e09424df99c268645f29dc9348 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 15:10:25 +0000 Subject: [PATCH 3/7] revert --- .github/workflows/tests.yaml | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8e554e18aab..7e087f99757 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -22,8 +22,16 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest] - python-version: ["3.8"] + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ["3.8", "3.9"] + # Cherry-pick test modules to split the overall runtime roughly in half + partition: [ci1, not ci1] + include: + - partition: "ci1" + partition-label: "ci1" + - partition: "not ci1" + partition-label: "notci1" + # Uncomment to stress-test the test suite for random failures. # Must also change env.TEST_ID below. # This will take a LONG time and delay all PRs across the whole github.com/dask! @@ -33,7 +41,8 @@ jobs: # run: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] env: - TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }} + TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }} + # TEST_ID: ${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.partition-label }}-${{ matrix.run }} steps: - name: Checkout source @@ -120,8 +129,8 @@ jobs: set -o pipefail mkdir reports - pytest distributed/cli/tests/test_dask_worker.py \ - -m "not avoid_ci" --runslow \ + pytest distributed \ + -m "not avoid_ci and ${{ matrix.partition }}" --runslow \ --leaks=fds,processes,threads \ --junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \ --cov=distributed --cov-report=xml \ From 01e117484d3928daa7dbcea1239861000a749002 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 15:14:28 +0000 Subject: [PATCH 4/7] cleanup --- distributed/cli/tests/test_dask_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index af48c6da1f2..be24fda9d46 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -38,7 +38,7 @@ async def test_nanny_worker_ports(c, s): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[], timeout=1800) +@gen_cluster(client=True, nthreads=[]) async def test_nanny_worker_port_range(c, s): with popen( [ From a08df0ac013c6c2a0012a633257bddf6abc73d17 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 15:56:54 +0000 Subject: [PATCH 5/7] fix regressions --- distributed/cli/tests/test_dask_scheduler.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index f9e15607386..d73905401ff 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -63,13 +63,13 @@ def test_no_dashboard(loop): def test_dashboard(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler"]) as proc: - for line in proc.stderr: + with popen(["dask-scheduler"], flush_output=False) as proc: + for line in proc.stdout: if b"dashboard at" in line: dashboard_port = int(line.decode().split(":")[-1].strip()) break else: - raise Exception("dashboard not found") + assert False # pragma: nocover with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): pass @@ -217,12 +217,17 @@ def test_scheduler_port_zero(loop): def test_dashboard_port_zero(loop): pytest.importorskip("bokeh") - with popen(["dask-scheduler", "--dashboard-address", ":0"]) as proc: - for line in proc.stderr: + with popen( + ["dask-scheduler", "--dashboard-address", ":0"], + flush_output=False, + ) as proc: + for line in proc.stdout: if b"dashboard at" in line: dashboard_port = int(line.decode().split(":")[-1].strip()) assert dashboard_port != 0 break + else: + assert False # pragma: nocover PRELOAD_TEXT = """ From a9b8f29b51e295f6e83d5c7e287b1529c316e727 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 16:44:40 +0000 Subject: [PATCH 6/7] fix regression --- distributed/cli/tests/test_dask_ssh.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 62c85896e55..826946d77f8 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -18,14 +18,17 @@ def test_version_option(): assert result.exit_code == 0 +@pytest.mark.slow def test_ssh_cli_nprocs_renamed_to_nworkers(loop): - n_workers = 2 with popen( - ["dask-ssh", f"--nprocs={n_workers}", "--nohost", "localhost"], + ["dask-ssh", "--nprocs=2", "--nohost", "localhost"], flush_output=False, ) as proc: - with Client("tcp://127.0.0.1:8786", loop=loop) as c: - c.wait_for_workers(n_workers) + with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c: + c.wait_for_workers(2, timeout="15 seconds") + # This interrupt is necessary for the cluster to place output into the stdout + # and stderr pipes + proc.send_signal(2) assert any( b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15) ) From 1b3e6c31603c45a33c01ccb7a5cf4f2a07f3e58c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 18 Mar 2022 17:51:02 +0000 Subject: [PATCH 7/7] Don't abuse Proc.communicate --- distributed/cli/tests/test_dask_worker.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index be24fda9d46..e38b4d70be0 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -119,8 +119,7 @@ async def test_no_reconnect(c, s, nanny): "--no-reconnect", nanny, "--no-dashboard", - ], - flush_output=False, + ] ) as worker: # roundtrip works assert await c.submit(lambda x: x + 1, 10) == 11 @@ -129,7 +128,7 @@ async def test_no_reconnect(c, s, nanny): comm.abort() # worker terminates as soon as the connection is aborted - await to_thread(worker.communicate, timeout=5) + await to_thread(worker.wait, timeout=5) assert worker.returncode == 0 @@ -144,11 +143,10 @@ async def test_reconnect(c, s, nanny): "--reconnect", nanny, "--no-dashboard", - ], - flush_output=False, + ] ) as worker: # roundtrip works - await c.submit(lambda x: x + 1, 10) == 11 + assert await c.submit(lambda x: x + 1, 10) == 11 (comm,) = s.stream_comms.values() comm.abort() @@ -158,7 +156,7 @@ async def test_reconnect(c, s, nanny): # closing the scheduler cleanly does terminate the worker await s.close() - await to_thread(worker.communicate, timeout=5) + await to_thread(worker.wait, timeout=5) assert worker.returncode == 0