diff --git a/conftest.py b/conftest.py index a191a990..34c44d8d 100644 --- a/conftest.py +++ b/conftest.py @@ -6,6 +6,19 @@ import pytest import dask_jobqueue.lsf +import dask + +from dask_jobqueue import ( + PBSCluster, + MoabCluster, + SLURMCluster, + SGECluster, + LSFCluster, + OARCluster, + HTCondorCluster, +) + +from dask_jobqueue.local import LocalCluster def pytest_addoption(parser): @@ -18,19 +31,35 @@ def pytest_addoption(parser): def pytest_configure(config): - # register an additional marker + # register additional markers config.addinivalue_line( "markers", "env(NAME): only run test if environment NAME matches" ) + config.addinivalue_line( + "markers", "xfail_env(NAME): known failure for environment NAME" + ) def pytest_runtest_setup(item): - envnames = [mark.args[0] for mark in item.iter_markers(name="env")] - if (item.config.getoption("-E") is None and envnames) or ( - item.config.getoption("-E") is not None - and item.config.getoption("-E") not in envnames + env = item.config.getoption("-E") + envnames = sum( + [ + mark.args[0] if isinstance(mark.args[0], list) else [mark.args[0]] + for mark in item.iter_markers(name="env") + ], + [], + ) + if ( + None not in envnames + and (env is None and envnames) + or (env is not None and env not in envnames) ): pytest.skip("test requires env in %r" % envnames) + else: + xfail = {} + [xfail.update(mark.args[0]) for mark in item.iter_markers(name="xfail_env")] + if env in xfail: + item.add_marker(pytest.mark.xfail(reason=xfail[env])) @pytest.fixture(autouse=True) @@ -46,3 +75,35 @@ def mock_lsf_version(monkeypatch, request): except OSError: # Provide a fake implementation of lsf_version() monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10") + + +all_envs = { + None: LocalCluster, + "pbs": PBSCluster, + "moab": MoabCluster, + "slurm": SLURMCluster, + "sge": SGECluster, + "lsf": LSFCluster, + "oar": OARCluster, + "htcondor": HTCondorCluster, +} + + +@pytest.fixture( + params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()] +) +def EnvSpecificCluster(request): + """Run test only with the specific cluster class set by the environment""" + if request.param == HTCondorCluster: + # HTCondor requires explicitly specifying requested disk space + dask.config.set({"jobqueue.htcondor.disk": "1GB"}) + return request.param + + +@pytest.fixture(params=list(all_envs.values())) +def Cluster(request): + """Run test for each cluster class when no environment is set (test should not require the actual scheduler)""" + if request.param == HTCondorCluster: + # HTCondor requires explicitly specifying requested disk space + dask.config.set({"jobqueue.htcondor.disk": "1GB"}) + return request.param diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 1a2a00a7..fa94febd 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -1,24 +1,8 @@ import asyncio from time import time -from dask_jobqueue import ( - PBSCluster, - SGECluster, - SLURMCluster, - LSFCluster, - HTCondorCluster, - MoabCluster, - OARCluster, -) -from dask_jobqueue.local import LocalJob, LocalCluster +from dask_jobqueue.local import LocalCluster from dask_jobqueue.pbs import PBSJob -from dask_jobqueue.sge import SGEJob -from dask_jobqueue.slurm import SLURMJob -from dask_jobqueue.lsf import LSFJob -from dask_jobqueue.moab import MoabJob -from dask_jobqueue.htcondor import HTCondorJob -from dask_jobqueue.oar import OARJob - from dask_jobqueue.core import JobQueueCluster from dask.distributed import Scheduler, Client from distributed.core import Status @@ -26,36 +10,15 @@ import pytest -def test_basic(): - job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") +def test_basic(Cluster): + job_cls = Cluster.job_cls + job = job_cls(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") assert "127.0.0.1:12345" in job.job_script() -job_protected = [ - pytest.param(SGEJob, marks=[pytest.mark.env("sge")]), - pytest.param(PBSJob, marks=[pytest.mark.env("pbs")]), - pytest.param(SLURMJob, marks=[pytest.mark.env("slurm")]), - pytest.param(LSFJob, marks=[pytest.mark.env("lsf")]), - LocalJob, -] - - -all_jobs = [SGEJob, PBSJob, SLURMJob, LSFJob, HTCondorJob, MoabJob, OARJob] -all_clusters = [ - SGECluster, - PBSCluster, - SLURMCluster, - LSFCluster, - HTCondorCluster, - MoabCluster, - OARCluster, - HTCondorCluster, -] - - -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_job(job_cls): +async def test_job(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with Scheduler(port=0) as s: job = job_cls(scheduler=s.address, name="foo", cores=1, memory="1GB") job = await job @@ -71,9 +34,9 @@ async def test_job(job_cls): assert time() < start + 10 -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_cluster(job_cls): +async def test_cluster(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" ) as cluster: @@ -94,9 +57,9 @@ async def test_cluster(job_cls): assert time() < start + 10 -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_adapt(job_cls): +async def test_adapt(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" ) as cluster: @@ -124,9 +87,9 @@ async def test_adapt(job_cls): assert not cluster.workers -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_adapt_parameters(job_cls): +async def test_adapt_parameters(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( cores=2, memory="1GB", processes=2, job_cls=job_cls, asynchronous=True ) as cluster: @@ -176,7 +139,6 @@ async def test_nprocs_scale(): assert len(cluster.worker_spec) == 1 -@pytest.mark.parametrize("Cluster", all_clusters) def test_docstring_cluster(Cluster): assert "cores :" in Cluster.__doc__ assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index b616392c..175d9438 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -14,12 +14,6 @@ from dask_jobqueue import ( JobQueueCluster, - PBSCluster, - MoabCluster, - SLURMCluster, - SGECluster, - LSFCluster, - OARCluster, HTCondorCluster, ) from dask_jobqueue.core import Job @@ -27,23 +21,6 @@ from dask_jobqueue.sge import SGEJob -all_clusters = [ - PBSCluster, - MoabCluster, - SLURMCluster, - SGECluster, - LSFCluster, - OARCluster, - HTCondorCluster, -] - - -def create_cluster_func(cluster_cls, **kwargs): - if cluster_cls is HTCondorCluster: - # HTCondorCluster has a mandatory disk argument - kwargs.setdefault("disk", "1GB") - return cluster_cls(**kwargs) - def test_errors(): match = re.compile("Job type.*job_cls", flags=re.DOTALL) @@ -51,8 +28,8 @@ def test_errors(): JobQueueCluster(cores=4) -def test_command_template(): - with PBSCluster(cores=2, memory="4GB") as cluster: +def test_command_template(Cluster): + with Cluster(cores=2, memory="4GB") as cluster: assert ( "%s -m distributed.cli.dask_worker" % (sys.executable) in cluster._dummy_job._command_template @@ -61,7 +38,7 @@ def test_command_template(): assert " --memory-limit " in cluster._dummy_job._command_template assert " --name " in cluster._dummy_job._command_template - with PBSCluster( + with Cluster( cores=2, memory="4GB", death_timeout=60, @@ -73,35 +50,33 @@ def test_command_template(): assert " --preload mymodule" in cluster._dummy_job._command_template -@pytest.mark.parametrize("Cluster", all_clusters) -def test_shebang_settings(Cluster): - if Cluster is HTCondorCluster: - pytest.skip( - "HTCondorCluster has a peculiar submit script and does not have a shebang" +def test_shebang_settings(Cluster, request): + if Cluster is HTCondorCluster or Cluster is LocalCluster: + request.node.add_marker( + pytest.mark.xfail( + reason="%s has a peculiar submit script and does not have a shebang" + % type(Cluster).__name__ + ) ) default_shebang = "#!/usr/bin/env bash" python_shebang = "#!/usr/bin/python" - with create_cluster_func( - Cluster, cores=2, memory="4GB", shebang=python_shebang - ) as cluster: + with Cluster(cores=2, memory="4GB", shebang=python_shebang) as cluster: job_script = cluster.job_script() assert job_script.startswith(python_shebang) assert "bash" not in job_script - with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster: + with Cluster(cores=2, memory="4GB") as cluster: job_script = cluster.job_script() assert job_script.startswith(default_shebang) -@pytest.mark.parametrize("Cluster", all_clusters) def test_dashboard_link(Cluster): - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link) -def test_forward_ip(): +def test_forward_ip(Cluster): ip = "127.0.0.1" - with PBSCluster( - walltime="00:02:00", + with Cluster( processes=4, cores=8, memory="28GB", @@ -111,9 +86,7 @@ def test_forward_ip(): assert cluster.scheduler.ip == ip default_ip = socket.gethostbyname("") - with PBSCluster( - walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker" - ) as cluster: + with Cluster(processes=4, cores=8, memory="28GB", name="dask-worker") as cluster: assert cluster.scheduler.ip == default_ip @@ -132,7 +105,7 @@ def test_forward_ip(): def test_job_id_from_qsub_legacy(Cluster, qsub_return_string): original_job_id = "654321" qsub_return_string = qsub_return_string.format(job_id=original_job_id) - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string) @@ -158,13 +131,13 @@ def test_job_id_from_qsub(job_cls, qsub_return_string): @pytest.mark.parametrize("Cluster", []) def test_job_id_error_handling_legacy(Cluster): # non-matching regexp - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="Could not parse job id"): return_string = "there is no number here" cluster._job_id_from_submit_output(return_string) # no job_id named group in the regexp - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="You need to use a 'job_id' named group"): return_string = "Job <12345> submitted to ." cluster.job_id_regexp = r"(\d+)" @@ -187,18 +160,18 @@ def test_job_id_error_handling(job_cls): job._job_id_from_submit_output(return_string) -def test_log_directory(tmpdir): +def test_log_directory(Cluster, tmpdir): shutil.rmtree(tmpdir.strpath, ignore_errors=True) - with PBSCluster(cores=1, memory="1GB"): + with Cluster(cores=1, memory="1GB"): assert not os.path.exists(tmpdir.strpath) - with PBSCluster(cores=1, memory="1GB", log_directory=tmpdir.strpath): + with Cluster(cores=1, memory="1GB", log_directory=tmpdir.strpath): assert os.path.exists(tmpdir.strpath) @pytest.mark.skip -def test_jobqueue_cluster_call(tmpdir): - cluster = PBSCluster(cores=1, memory="1GB") +def test_jobqueue_cluster_call(tmpdir, Cluster): + cluster = Cluster(cores=1, memory="1GB") path = tmpdir.join("test.py") path.write('print("this is the stdout")') @@ -221,17 +194,16 @@ def test_jobqueue_cluster_call(tmpdir): cluster._call([sys.executable, path_with_error.strpath]) -@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_has_cores_and_memory(Cluster): base_regex = r"{}.+".format(Cluster.__name__) with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"): - create_cluster_func(Cluster) + Cluster() with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='1GB'"): - create_cluster_func(Cluster, memory="1GB") + Cluster(memory="1GB") with pytest.raises(ValueError, match=base_regex + r"cores=4, memory='\d+GB'"): - create_cluster_func(Cluster, cores=4) + Cluster(cores=4) @pytest.mark.asyncio @@ -275,25 +247,22 @@ def __init__(self, *args, **kwargs): MyCluster(cores=1, memory="1GB") -@pytest.mark.parametrize("Cluster", all_clusters) def test_default_number_of_worker_processes(Cluster): - with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster: + with Cluster(cores=4, memory="4GB") as cluster: assert " --nprocs 4" in cluster.job_script() assert " --nthreads 1" in cluster.job_script() - with create_cluster_func(Cluster, cores=6, memory="4GB") as cluster: + with Cluster(cores=6, memory="4GB") as cluster: assert " --nprocs 3" in cluster.job_script() assert " --nthreads 2" in cluster.job_script() -@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options(Cluster): net_if_addrs = psutil.net_if_addrs() interface = list(net_if_addrs.keys())[0] port = 8804 - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}, @@ -303,23 +272,19 @@ def test_scheduler_options(Cluster): assert scheduler_options["port"] == port -@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options_interface(Cluster): net_if_addrs = psutil.net_if_addrs() scheduler_interface = list(net_if_addrs.keys())[0] worker_interface = "worker-interface" scheduler_host = socket.gethostname() - with create_cluster_func( - Cluster, cores=1, memory="1GB", interface=scheduler_interface - ) as cluster: + with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster: scheduler_options = cluster.scheduler_spec["options"] worker_options = cluster.new_spec["options"] assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == scheduler_interface - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", interface=worker_interface, @@ -330,8 +295,7 @@ def test_scheduler_options_interface(Cluster): assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == worker_interface - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", interface=worker_interface, @@ -343,25 +307,21 @@ def test_scheduler_options_interface(Cluster): assert worker_options["interface"] == worker_interface -@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster): scheduler_host = socket.gethostname() message_template = "pass {!r} through 'scheduler_options'" message = message_template.format("host") with pytest.raises(ValueError, match=message): - with create_cluster_func(Cluster, cores=1, memory="1GB", host=scheduler_host): + with Cluster(cores=1, memory="1GB", host=scheduler_host): pass message = message_template.format("dashboard_address") with pytest.raises(ValueError, match=message): - with create_cluster_func( - Cluster, cores=1, memory="1GB", dashboard_address=":8787" - ): + with Cluster(Cluster, cores=1, memory="1GB", dashboard_address=":8787"): pass -@pytest.mark.parametrize("Cluster", all_clusters) def test_import_scheduler_options_from_config(Cluster): net_if_addrs = psutil.net_if_addrs() @@ -382,13 +342,12 @@ def test_import_scheduler_options_from_config(Cluster): {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with create_cluster_func(Cluster, cores=2, memory="2GB") as cluster: + with Cluster(cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface assert scheduler_options.get("port") == config_scheduler_port - with create_cluster_func( - Cluster, + with Cluster( cores=2, memory="2GB", scheduler_options={"interface": pass_scheduler_interface}, @@ -398,7 +357,6 @@ def test_import_scheduler_options_from_config(Cluster): assert scheduler_options.get("port") is None -@pytest.mark.parametrize("Cluster", all_clusters) def test_wrong_parameter_error(Cluster): match = re.compile( "unexpected keyword argument.+wrong_parameter.+" @@ -407,12 +365,12 @@ def test_wrong_parameter_error(Cluster): re.DOTALL, ) with pytest.raises(ValueError, match=match): - create_cluster_func( - Cluster, cores=1, memory="1GB", wrong_parameter="wrong_parameter_value" - ) + Cluster(cores=1, memory="1GB", wrong_parameter="wrong_parameter_value") -def test_security(): +@pytest.mark.xfail_env({"htcondor": "#535 no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"}) +def test_security(EnvSpecificCluster, loop): dirname = os.path.dirname(__file__) key = os.path.join(dirname, "key.pem") cert = os.path.join(dirname, "ca.pem") @@ -427,8 +385,12 @@ def test_security(): require_encryption=True, ) - with LocalCluster( - cores=1, memory="1GB", security=security, protocol="tls" + with EnvSpecificCluster( + cores=1, + memory="100MB", + security=security, + protocol="tls", + loop=loop, ) as cluster: assert cluster.security == security assert cluster.scheduler_spec["options"]["security"] == security @@ -439,10 +401,15 @@ def test_security(): assert "--tls-ca-file {}".format(cert) in job_script cluster.scale(jobs=1) + with Client(cluster, security=security) as client: future = client.submit(lambda x: x + 1, 10) - result = future.result() + result = future.result(timeout=30) assert result == 11 - with LocalCluster(cores=1, memory="1GB", security=security) as cluster: + with EnvSpecificCluster( + cores=1, + memory="100MB", + security=security, + ) as cluster: assert "tls://" in job_script