diff --git a/dask_jobqueue/tests/test_cluster.py b/dask_jobqueue/tests/test_cluster.py new file mode 100644 index 00000000..2bdfd6d3 --- /dev/null +++ b/dask_jobqueue/tests/test_cluster.py @@ -0,0 +1,190 @@ +import pytest +import asyncio +from time import time + +from dask.distributed import Client + +import dask_jobqueue +from dask_jobqueue.tests import QUEUE_WAIT +from dask_jobqueue.local import LocalCluster + +cluster_classes = [ + getattr(dask_jobqueue, attr) + for attr in dir(dask_jobqueue) + if attr.endswith("Cluster") +] +cluster_names = [cls.__name__.replace("Cluster", "").lower() for cls in cluster_classes] + +cluster_params = [ + pytest.param(cluster_cls, marks=[pytest.mark.env(cluster_name)]) + for cluster_cls, cluster_name in zip(cluster_classes, cluster_names) +] +cluster_params.append(pytest.param(LocalCluster)) + + +def all_checks(): + checks = [obj for name, obj in globals().items() if name.startswith("check_")] + print("checks:", checks) + return checks + + +async def check_basic(cluster, client): + cluster.scale(2) + + start = time() + while not client.scheduler_info()["workers"]: + await asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + assert len(client.scheduler_info()["workers"]) > 0 + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 2e9 / 4 + assert w["nthreads"] == 2 + + cluster.scale(0) + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers and not cluster.worker_spec + + +async def check_scale_cores_memory(cluster, client): + cluster.scale(cores=2) + client.wait_for_workers(1) + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + assert cluster.workers + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 2e9 + assert w["nthreads"] == 2 + + cluster.scale(memory="0GB") + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers + + +async def check_basic_scale_edge_cases(cluster, client): + cluster.scale(2) + cluster.scale(0) + + # Wait to see what happens + asyncio.sleep(0.2) + start = time() + while cluster.workers: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers + + +async def check_scale_grouped(cluster, client): + cluster.scale(4) # Start 2 jobs + + start = time() + + while len(list(client.scheduler_info()["workers"].values())) != 4: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + # assert cluster.running_jobs + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 1e9 + assert w["nthreads"] == 1 + assert len(workers) == 4 + + cluster.scale(1) # Should leave 2 workers, 1 job + + start = time() + while len(client.scheduler_info()["workers"]) != 2: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + cluster.scale(0) + + start = time() + + assert not cluster.worker_spec + while len(client.scheduler_info()["workers"]) != 0: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive(cluster, client): + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + + start = time() + client.wait_for_workers(1) + + assert future.result(QUEUE_WAIT) == 11 + + del future + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive_grouped(cluster, client): + cluster.adapt(minimum=1) # at least 1 worker + client.wait_for_workers(1) + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + processes = cluster._dummy_job.worker_processes + while len(client.scheduler_info()["workers"]) != processes: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive_cores_mem(cluster, client): + cluster.adapt(minimum_cores=0, maximum_memory="4GB") + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + processes = cluster._dummy_job.worker_processes + while len(client.scheduler_info()["workers"]) != processes: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + del future + + start = time() + while cluster.workers: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + +@pytest.mark.parametrize("cluster_cls", cluster_params) +@pytest.mark.parametrize("check", all_checks()) +@pytest.mark.asyncio +async def test(cluster_cls, check): + async with cluster_cls( + cores=6, memory="6GB", processes=2, asynchronous=True + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + check(cluster, client) diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index cd9349f0..bae01f07 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -1,8 +1,4 @@ import sys -from time import sleep, time - -import pytest -from distributed import Client import dask @@ -50,35 +46,6 @@ def test_job_script(): assert "--nprocs 2" in job_script -@pytest.mark.env("htcondor") -def test_basic(loop): - with HTCondorCluster(cores=1, memory="100MB", disk="100MB", loop=loop) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 1e8 - assert w["nthreads"] == 1 - - cluster.scale(0) - - start = time() - while cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - def test_config_name_htcondor_takes_custom_config(): conf = { "cores": 1, diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 71f26f09..3312183b 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -24,7 +24,13 @@ import pytest +# General TODO for this module: should I get rid of most stuff or at least +# improve test_cluster.py taking inspiration from this? I guess I still want to +# test JobQueueCluster(job=...) maybe at least in a very simple way but at the +# same time I feel like this is duplicating the testing of FooCluster classes + +# TODO: move to common test def test_basic(): job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") assert "127.0.0.1:12345" in job.job_script() @@ -174,6 +180,7 @@ async def test_nprocs_scale(): assert len(cluster.worker_spec) == 1 +# TODO: this should move to test_cluster.py ? @pytest.mark.parametrize("Cluster", all_clusters) def test_docstring_cluster(Cluster): assert "cores :" in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 381b0acb..9f1f9dd6 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -3,17 +3,13 @@ import sys from textwrap import dedent import tempfile -from time import sleep, time import dask import pytest -from dask.distributed import Client from distributed.utils import parse_bytes from dask_jobqueue import LSFCluster, lsf -from . import QUEUE_WAIT - def test_header(): with LSFCluster(walltime="00:02", processes=4, cores=8, memory="8GB") as cluster: @@ -110,111 +106,7 @@ def test_job_script(): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("lsf") -def test_basic(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - with Client(cluster) as client: - cluster.start_workers(2) - assert cluster.pending_jobs or cluster.running_jobs - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.stop_workers(workers) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.running_jobs - - -@pytest.mark.env("lsf") -def test_adaptive(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while len(client.scheduler_info()["workers"]) > 0: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - start = time() - while cluster.pending_jobs or cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - assert cluster.finished_jobs - - -@pytest.mark.env("lsf") -def test_adaptive_grouped(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - cluster.adapt(minimum=1) # at least 1 worker - with Client(cluster) as client: - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - while not cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - start = time() - processes = cluster.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - +# TODO common test def test_config(loop): with dask.config.set( { @@ -254,6 +146,7 @@ def test_use_stdin(loop, config_value, constructor_value): assert cluster._dummy_job.use_stdin == config_value +# TODO common test def test_config_name_lsf_takes_custom_config(): conf = { "queue": "myqueue", @@ -282,6 +175,7 @@ def test_config_name_lsf_takes_custom_config(): assert cluster.job_name == "myname" +# TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: LSFCluster(memory=None, cores=4) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 2e92162c..79a0b392 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -1,14 +1,10 @@ import sys -from time import sleep, time import dask import pytest -from dask.distributed import Client from dask_jobqueue import MoabCluster, PBSCluster -from . import QUEUE_WAIT - @pytest.mark.parametrize("Cluster", [PBSCluster, MoabCluster]) def test_header(Cluster): @@ -98,234 +94,7 @@ def test_job_script(Cluster): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("pbs") -def test_basic(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - client.wait_for_workers(2) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - # assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers and not cluster.worker_spec - - -@pytest.mark.env("pbs") -def test_scale_cores_memory(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(cores=2) - client.wait_for_workers(1) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.workers - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(memory="0GB") - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers - - -@pytest.mark.env("pbs") -def test_basic_scale_edge_cases(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - - cluster.scale(2) - cluster.scale(0) - - # Wait to see what happens - sleep(0.2) - start = time() - while cluster.workers: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers - - -@pytest.mark.env("pbs") -def test_adaptive(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while client.scheduler_info()["workers"] or cluster.workers: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_adaptive_grouped(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt(minimum=1) # at least 1 worker - with Client(cluster) as client: - client.wait_for_workers(1) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_adaptive_cores_mem(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt(minimum_cores=0, maximum_memory="4GB") - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while cluster.workers: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_scale_grouped(loop): - with PBSCluster( - walltime="00:02:00", - processes=2, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(4) # Start 2 jobs - - start = time() - - while len(list(client.scheduler_info()["workers"].values())) != 4: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - # assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 1e9 - assert w["nthreads"] == 1 - assert len(workers) == 4 - - cluster.scale(1) # Should leave 2 workers, 1 job - - start = time() - while len(client.scheduler_info()["workers"]) != 2: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - cluster.scale(0) - - start = time() - - assert not cluster.worker_spec - while len(client.scheduler_info()["workers"]) != 0: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - +# TODO make it a common test? def test_config(loop): with dask.config.set( {"jobqueue.pbs.walltime": "00:02:00", "jobqueue.pbs.local-directory": "/foo"} @@ -335,6 +104,7 @@ def test_config(loop): assert "--local-directory /foo" in cluster.job_script() +# TODO make it a common test def test_config_name_pbs_takes_custom_config(): conf = { "queue": "myqueue", @@ -363,6 +133,7 @@ def test_config_name_pbs_takes_custom_config(): assert cluster.job_name == "myname" +# TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: PBSCluster(memory=None, cores=4) @@ -373,6 +144,7 @@ def test_informative_errors(): assert "cores" in str(info.value) +# TODO Hmmm what is this testing? @pytest.mark.asyncio async def test_adapt(loop): async with PBSCluster(cores=1, memory="1 GB", asynchronous=True) as cluster: diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 6020dabb..abc48afa 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -9,37 +9,6 @@ from . import QUEUE_WAIT -@pytest.mark.env("sge") -def test_basic(loop): - with SGECluster( - walltime="00:02:00", cores=8, processes=4, memory="2GB", loop=loop - ) as cluster: - with Client(cluster, loop=loop) as client: - - cluster.scale(2) - - start = time() - while not client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert len(client.scheduler_info()["workers"]) > 0 - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 / 4 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - def test_config_name_sge_takes_custom_config(): conf = { "queue": "myqueue", @@ -102,6 +71,7 @@ def test_job_script(tmpdir): assert each in job_script +# TODO what should I do with this one? Should I do it for all clusters? @pytest.mark.env("sge") def test_complex_cancel_command(loop): with SGECluster( diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 6b26ffc0..3998d917 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,15 +1,9 @@ import sys -from time import sleep, time - -import pytest -from distributed import Client import dask from dask_jobqueue import SLURMCluster -from . import QUEUE_WAIT - def test_header(): with SLURMCluster( @@ -108,66 +102,7 @@ def test_job_script(): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("slurm") -def test_basic(loop): - with SLURMCluster( - walltime="00:02:00", - cores=2, - processes=1, - memory="2GB", - # job_extra=["-D /"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - - start = time() - client.wait_for_workers(2) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("slurm") -def test_adaptive(loop): - with SLURMCluster( - walltime="00:02:00", - cores=2, - processes=1, - memory="2GB", - # job_extra=["-D /"], - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - - start = time() - client.wait_for_workers(1) - - assert future.result(QUEUE_WAIT) == 11 - - del future - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - +# TODO I should turn that into a common test as well def test_config_name_slurm_takes_custom_config(): conf = { "queue": "myqueue",