From 9892ffd670e4a11b2f3c5b08a5bfcd202d12c9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Sat, 12 Oct 2019 21:03:01 +0200 Subject: [PATCH 1/3] wip --- dask_jobqueue/tests/test_cluster.py | 183 +++++++++++++++++++++ dask_jobqueue/tests/test_htcondor.py | 29 ---- dask_jobqueue/tests/test_job.py | 6 + dask_jobqueue/tests/test_lsf.py | 109 +------------ dask_jobqueue/tests/test_pbs.py | 234 +-------------------------- dask_jobqueue/tests/test_sge.py | 32 +--- dask_jobqueue/tests/test_slurm.py | 61 +------ 7 files changed, 198 insertions(+), 456 deletions(-) create mode 100644 dask_jobqueue/tests/test_cluster.py diff --git a/dask_jobqueue/tests/test_cluster.py b/dask_jobqueue/tests/test_cluster.py new file mode 100644 index 00000000..f269daf8 --- /dev/null +++ b/dask_jobqueue/tests/test_cluster.py @@ -0,0 +1,183 @@ +import pytest +import asyncio +from time import time + +from dask.distributed import Client + +import dask_jobqueue +from dask_jobqueue.tests import QUEUE_WAIT +from dask_jobqueue.local import LocalCluster + +cluster_classes = [getattr(dask_jobqueue, attr) for attr in dir(dask_jobqueue) if attr.endswith('Cluster')] +cluster_names = [cls.__name__.replace('Cluster', '').lower() for cls in cluster_classes] + +cluster_params = [pytest.param(cluster_cls, marks=[pytest.mark.env(cluster_name)]) + for cluster_cls, cluster_name in zip(cluster_classes, cluster_names)] +cluster_params.append(pytest.param(LocalCluster)) + + +def all_checks(): + checks = [obj for name, obj in globals().items() if name.startswith('check_')] + print('checks:', checks) + return checks + + +async def check_basic(cluster, client): + cluster.scale(2) + + start = time() + while not client.scheduler_info()["workers"]: + await asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + assert len(client.scheduler_info()["workers"]) > 0 + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 2e9 / 4 + assert w["nthreads"] == 2 + + cluster.scale(0) + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers and not cluster.worker_spec + + +async def check_scale_cores_memory(cluster, client): + cluster.scale(cores=2) + client.wait_for_workers(1) + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + assert cluster.workers + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 2e9 + assert w["nthreads"] == 2 + + cluster.scale(memory="0GB") + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers + + +async def check_basic_scale_edge_cases(cluster, client): + cluster.scale(2) + cluster.scale(0) + + # Wait to see what happens + asyncio.sleep(0.2) + start = time() + while cluster.workers: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + assert not cluster.workers + + +async def check_scale_grouped(cluster, client): + cluster.scale(4) # Start 2 jobs + + start = time() + + while len(list(client.scheduler_info()["workers"].values())) != 4: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + # assert cluster.running_jobs + + workers = list(client.scheduler_info()["workers"].values()) + w = workers[0] + assert w["memory_limit"] == 1e9 + assert w["nthreads"] == 1 + assert len(workers) == 4 + + cluster.scale(1) # Should leave 2 workers, 1 job + + start = time() + while len(client.scheduler_info()["workers"]) != 2: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + cluster.scale(0) + + start = time() + + assert not cluster.worker_spec + while len(client.scheduler_info()["workers"]) != 0: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive(cluster, client): + cluster.adapt() + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + + start = time() + client.wait_for_workers(1) + + assert future.result(QUEUE_WAIT) == 11 + + del future + + start = time() + while client.scheduler_info()["workers"]: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive_grouped(cluster, client): + cluster.adapt(minimum=1) # at least 1 worker + client.wait_for_workers(1) + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + processes = cluster._dummy_job.worker_processes + while len(client.scheduler_info()["workers"]) != processes: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + +async def check_adaptive_cores_mem(cluster, client): + cluster.adapt(minimum_cores=0, maximum_memory="4GB") + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + processes = cluster._dummy_job.worker_processes + while len(client.scheduler_info()["workers"]) != processes: + asyncio.sleep(0.1) + assert time() < start + QUEUE_WAIT + + del future + + start = time() + while cluster.workers: + asyncio.sleep(0.100) + assert time() < start + QUEUE_WAIT + +@pytest.mark.parametrize("cluster_cls", cluster_params) +@pytest.mark.parametrize("check", all_checks()) +@pytest.mark.asyncio +async def test(cluster_cls, check): + async with cluster_cls( + cores=6, memory="6GB", processes=2, asynchronous=True + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + check(cluster, client) diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index cd9349f0..855ae274 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -50,35 +50,6 @@ def test_job_script(): assert "--nprocs 2" in job_script -@pytest.mark.env("htcondor") -def test_basic(loop): - with HTCondorCluster(cores=1, memory="100MB", disk="100MB", loop=loop) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 1e8 - assert w["nthreads"] == 1 - - cluster.scale(0) - - start = time() - while cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - def test_config_name_htcondor_takes_custom_config(): conf = { "cores": 1, diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 71f26f09..53b46c18 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -24,7 +24,12 @@ import pytest +# General TODO for this module: should I get rid of most stuff or at least +# improve test_cluster.py taking inspiration from this? I guess I still want to +# test JobQueueCluster(job=...) maybe at least in a very simple way but at the +# same time I feel like this is duplicating the testing of FooCluster classes +# TODO: move to common test def test_basic(): job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") assert "127.0.0.1:12345" in job.job_script() @@ -174,6 +179,7 @@ async def test_nprocs_scale(): assert len(cluster.worker_spec) == 1 +# TODO: this should move to test_cluster.py ? @pytest.mark.parametrize("Cluster", all_clusters) def test_docstring_cluster(Cluster): assert "cores :" in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 381b0acb..927619e9 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -110,111 +110,7 @@ def test_job_script(): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("lsf") -def test_basic(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - with Client(cluster) as client: - cluster.start_workers(2) - assert cluster.pending_jobs or cluster.running_jobs - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.stop_workers(workers) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.running_jobs - - -@pytest.mark.env("lsf") -def test_adaptive(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while len(client.scheduler_info()["workers"]) > 0: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - start = time() - while cluster.pending_jobs or cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - assert cluster.finished_jobs - - -@pytest.mark.env("lsf") -def test_adaptive_grouped(loop): - with LSFCluster( - walltime="00:02", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - loop=loop, - ) as cluster: - cluster.adapt(minimum=1) # at least 1 worker - with Client(cluster) as client: - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - while not cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - start = time() - processes = cluster.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - +# TODO common test def test_config(loop): with dask.config.set( { @@ -254,6 +150,7 @@ def test_use_stdin(loop, config_value, constructor_value): assert cluster._dummy_job.use_stdin == config_value +# TODO common test def test_config_name_lsf_takes_custom_config(): conf = { "queue": "myqueue", @@ -281,7 +178,7 @@ def test_config_name_lsf_takes_custom_config(): with LSFCluster(config_name="lsf-config-name") as cluster: assert cluster.job_name == "myname" - +# TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: LSFCluster(memory=None, cores=4) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 2e92162c..55603149 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -98,234 +98,7 @@ def test_job_script(Cluster): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("pbs") -def test_basic(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - client.wait_for_workers(2) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - # assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers and not cluster.worker_spec - - -@pytest.mark.env("pbs") -def test_scale_cores_memory(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(cores=2) - client.wait_for_workers(1) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert cluster.workers - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(memory="0GB") - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers - - -@pytest.mark.env("pbs") -def test_basic_scale_edge_cases(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - - cluster.scale(2) - cluster.scale(0) - - # Wait to see what happens - sleep(0.2) - start = time() - while cluster.workers: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - assert not cluster.workers - - -@pytest.mark.env("pbs") -def test_adaptive(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while client.scheduler_info()["workers"] or cluster.workers: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_adaptive_grouped(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt(minimum=1) # at least 1 worker - with Client(cluster) as client: - client.wait_for_workers(1) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_adaptive_cores_mem(loop): - with PBSCluster( - walltime="00:02:00", - processes=1, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - cluster.adapt(minimum_cores=0, maximum_memory="4GB") - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - start = time() - processes = cluster._dummy_job.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - - del future - - start = time() - while cluster.workers: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("pbs") -def test_scale_grouped(loop): - with PBSCluster( - walltime="00:02:00", - processes=2, - cores=2, - memory="2GB", - local_directory="/tmp", - job_extra=["-V"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(4) # Start 2 jobs - - start = time() - - while len(list(client.scheduler_info()["workers"].values())) != 4: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - # assert cluster.running_jobs - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 1e9 - assert w["nthreads"] == 1 - assert len(workers) == 4 - - cluster.scale(1) # Should leave 2 workers, 1 job - - start = time() - while len(client.scheduler_info()["workers"]) != 2: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - cluster.scale(0) - - start = time() - - assert not cluster.worker_spec - while len(client.scheduler_info()["workers"]) != 0: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - +# TODO make it a common test? def test_config(loop): with dask.config.set( {"jobqueue.pbs.walltime": "00:02:00", "jobqueue.pbs.local-directory": "/foo"} @@ -334,7 +107,7 @@ def test_config(loop): assert "00:02:00" in cluster.job_script() assert "--local-directory /foo" in cluster.job_script() - +# TODO make it a common test def test_config_name_pbs_takes_custom_config(): conf = { "queue": "myqueue", @@ -362,7 +135,7 @@ def test_config_name_pbs_takes_custom_config(): with PBSCluster(config_name="pbs-config-name") as cluster: assert cluster.job_name == "myname" - +# TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: PBSCluster(memory=None, cores=4) @@ -373,6 +146,7 @@ def test_informative_errors(): assert "cores" in str(info.value) +# TODO Hmmm what is this testing? @pytest.mark.asyncio async def test_adapt(loop): async with PBSCluster(cores=1, memory="1 GB", asynchronous=True) as cluster: diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 6020dabb..abc48afa 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -9,37 +9,6 @@ from . import QUEUE_WAIT -@pytest.mark.env("sge") -def test_basic(loop): - with SGECluster( - walltime="00:02:00", cores=8, processes=4, memory="2GB", loop=loop - ) as cluster: - with Client(cluster, loop=loop) as client: - - cluster.scale(2) - - start = time() - while not client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - assert len(client.scheduler_info()["workers"]) > 0 - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 / 4 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - def test_config_name_sge_takes_custom_config(): conf = { "queue": "myqueue", @@ -102,6 +71,7 @@ def test_job_script(tmpdir): assert each in job_script +# TODO what should I do with this one? Should I do it for all clusters? @pytest.mark.env("sge") def test_complex_cancel_command(loop): with SGECluster( diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 6b26ffc0..c0c08d34 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -108,66 +108,7 @@ def test_job_script(): assert "--nthreads 2 --nprocs 4 --memory-limit 7.00GB" in job_script -@pytest.mark.env("slurm") -def test_basic(loop): - with SLURMCluster( - walltime="00:02:00", - cores=2, - processes=1, - memory="2GB", - # job_extra=["-D /"], - loop=loop, - ) as cluster: - with Client(cluster) as client: - - cluster.scale(2) - - start = time() - client.wait_for_workers(2) - - future = client.submit(lambda x: x + 1, 10) - assert future.result(QUEUE_WAIT) == 11 - - workers = list(client.scheduler_info()["workers"].values()) - w = workers[0] - assert w["memory_limit"] == 2e9 - assert w["nthreads"] == 2 - - cluster.scale(0) - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - -@pytest.mark.env("slurm") -def test_adaptive(loop): - with SLURMCluster( - walltime="00:02:00", - cores=2, - processes=1, - memory="2GB", - # job_extra=["-D /"], - loop=loop, - ) as cluster: - cluster.adapt() - with Client(cluster) as client: - future = client.submit(lambda x: x + 1, 10) - - start = time() - client.wait_for_workers(1) - - assert future.result(QUEUE_WAIT) == 11 - - del future - - start = time() - while client.scheduler_info()["workers"]: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - +# TODO I should turn that into a common test as well def test_config_name_slurm_takes_custom_config(): conf = { "queue": "myqueue", From 7ced6d51af01b448df4b93c514f0c653d057c1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 16 Oct 2019 18:02:15 +0200 Subject: [PATCH 2/3] flake8 --- dask_jobqueue/tests/test_cluster.py | 1 + dask_jobqueue/tests/test_htcondor.py | 4 ---- dask_jobqueue/tests/test_job.py | 1 + dask_jobqueue/tests/test_lsf.py | 5 +---- dask_jobqueue/tests/test_pbs.py | 6 ++---- dask_jobqueue/tests/test_slurm.py | 6 ------ 6 files changed, 5 insertions(+), 18 deletions(-) diff --git a/dask_jobqueue/tests/test_cluster.py b/dask_jobqueue/tests/test_cluster.py index f269daf8..f1929e17 100644 --- a/dask_jobqueue/tests/test_cluster.py +++ b/dask_jobqueue/tests/test_cluster.py @@ -172,6 +172,7 @@ async def check_adaptive_cores_mem(cluster, client): asyncio.sleep(0.100) assert time() < start + QUEUE_WAIT + @pytest.mark.parametrize("cluster_cls", cluster_params) @pytest.mark.parametrize("check", all_checks()) @pytest.mark.asyncio diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 855ae274..bae01f07 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -1,8 +1,4 @@ import sys -from time import sleep, time - -import pytest -from distributed import Client import dask diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 53b46c18..3312183b 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -29,6 +29,7 @@ # test JobQueueCluster(job=...) maybe at least in a very simple way but at the # same time I feel like this is duplicating the testing of FooCluster classes + # TODO: move to common test def test_basic(): job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 927619e9..9f1f9dd6 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -3,17 +3,13 @@ import sys from textwrap import dedent import tempfile -from time import sleep, time import dask import pytest -from dask.distributed import Client from distributed.utils import parse_bytes from dask_jobqueue import LSFCluster, lsf -from . import QUEUE_WAIT - def test_header(): with LSFCluster(walltime="00:02", processes=4, cores=8, memory="8GB") as cluster: @@ -178,6 +174,7 @@ def test_config_name_lsf_takes_custom_config(): with LSFCluster(config_name="lsf-config-name") as cluster: assert cluster.job_name == "myname" + # TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 55603149..79a0b392 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -1,14 +1,10 @@ import sys -from time import sleep, time import dask import pytest -from dask.distributed import Client from dask_jobqueue import MoabCluster, PBSCluster -from . import QUEUE_WAIT - @pytest.mark.parametrize("Cluster", [PBSCluster, MoabCluster]) def test_header(Cluster): @@ -107,6 +103,7 @@ def test_config(loop): assert "00:02:00" in cluster.job_script() assert "--local-directory /foo" in cluster.job_script() + # TODO make it a common test def test_config_name_pbs_takes_custom_config(): conf = { @@ -135,6 +132,7 @@ def test_config_name_pbs_takes_custom_config(): with PBSCluster(config_name="pbs-config-name") as cluster: assert cluster.job_name == "myname" + # TODO common test def test_informative_errors(): with pytest.raises(ValueError) as info: diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index c0c08d34..3998d917 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -1,15 +1,9 @@ import sys -from time import sleep, time - -import pytest -from distributed import Client import dask from dask_jobqueue import SLURMCluster -from . import QUEUE_WAIT - def test_header(): with SLURMCluster( From 2d7da96f5fc7f5c5a77bfb1abff7e151d5b00ac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Sun, 1 Dec 2019 16:47:47 +0100 Subject: [PATCH 3/3] Fix black. --- dask_jobqueue/tests/test_cluster.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dask_jobqueue/tests/test_cluster.py b/dask_jobqueue/tests/test_cluster.py index f1929e17..2bdfd6d3 100644 --- a/dask_jobqueue/tests/test_cluster.py +++ b/dask_jobqueue/tests/test_cluster.py @@ -8,17 +8,23 @@ from dask_jobqueue.tests import QUEUE_WAIT from dask_jobqueue.local import LocalCluster -cluster_classes = [getattr(dask_jobqueue, attr) for attr in dir(dask_jobqueue) if attr.endswith('Cluster')] -cluster_names = [cls.__name__.replace('Cluster', '').lower() for cls in cluster_classes] - -cluster_params = [pytest.param(cluster_cls, marks=[pytest.mark.env(cluster_name)]) - for cluster_cls, cluster_name in zip(cluster_classes, cluster_names)] +cluster_classes = [ + getattr(dask_jobqueue, attr) + for attr in dir(dask_jobqueue) + if attr.endswith("Cluster") +] +cluster_names = [cls.__name__.replace("Cluster", "").lower() for cls in cluster_classes] + +cluster_params = [ + pytest.param(cluster_cls, marks=[pytest.mark.env(cluster_name)]) + for cluster_cls, cluster_name in zip(cluster_classes, cluster_names) +] cluster_params.append(pytest.param(LocalCluster)) def all_checks(): - checks = [obj for name, obj in globals().items() if name.startswith('check_')] - print('checks:', checks) + checks = [obj for name, obj in globals().items() if name.startswith("check_")] + print("checks:", checks) return checks