diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 71f26f09..c4d7b25a 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -48,6 +48,7 @@ def test_basic(): HTCondorCluster, MoabCluster, OARCluster, + HTCondorCluster, ] diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index db44002b..39d97831 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -17,12 +17,30 @@ SGECluster, LSFCluster, OARCluster, + HTCondorCluster, ) from dask_jobqueue.core import Job from dask_jobqueue.local import LocalCluster from dask_jobqueue.sge import SGEJob +all_clusters = [ + PBSCluster, + MoabCluster, + SLURMCluster, + SGECluster, + LSFCluster, + OARCluster, + HTCondorCluster, +] + + +def create_cluster_func(cluster_cls, **kwargs): + if cluster_cls is HTCondorCluster: + # HTCondorCluster has a mandatory disk argument + kwargs.setdefault("disk", "1GB") + return cluster_cls(**kwargs) + def test_errors(): match = re.compile("Job type.*job_cls", flags=re.DOTALL) @@ -52,27 +70,28 @@ def test_command_template(): assert " --preload mymodule" in cluster._dummy_job._command_template -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_shebang_settings(Cluster): + if Cluster is HTCondorCluster: + pytest.skip( + "HTCondorCluster has a peculiar submit script and does not have a shebang" + ) default_shebang = "#!/usr/bin/env bash" python_shebang = "#!/usr/bin/python" - with Cluster(cores=2, memory="4GB", shebang=python_shebang) as cluster: + with create_cluster_func( + Cluster, cores=2, memory="4GB", shebang=python_shebang + ) as cluster: job_script = cluster.job_script() assert job_script.startswith(python_shebang) assert "bash" not in job_script - with Cluster(cores=2, memory="4GB") as cluster: + with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster: job_script = cluster.job_script() assert job_script.startswith(default_shebang) -@pytest.mark.parametrize( - "Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster] -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_dashboard_link(Cluster): - with Cluster(cores=1, memory="1GB") as cluster: + with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link) @@ -110,7 +129,7 @@ def test_forward_ip(): def test_job_id_from_qsub_legacy(Cluster, qsub_return_string): original_job_id = "654321" qsub_return_string = qsub_return_string.format(job_id=original_job_id) - with Cluster(cores=1, memory="1GB") as cluster: + with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string) @@ -136,13 +155,13 @@ def test_job_id_from_qsub(job_cls, qsub_return_string): @pytest.mark.parametrize("Cluster", []) def test_job_id_error_handling_legacy(Cluster): # non-matching regexp - with Cluster(cores=1, memory="1GB") as cluster: + with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="Could not parse job id"): return_string = "there is no number here" cluster._job_id_from_submit_output(return_string) # no job_id named group in the regexp - with Cluster(cores=1, memory="1GB") as cluster: + with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="You need to use a 'job_id' named group"): return_string = "Job <12345> submitted to ." cluster.job_id_regexp = r"(\d+)" @@ -199,20 +218,17 @@ def test_jobqueue_cluster_call(tmpdir): cluster._call([sys.executable, path_with_error.strpath]) -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_has_cores_and_memory(Cluster): base_regex = r"{}.+".format(Cluster.__name__) with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"): - Cluster() + create_cluster_func(Cluster) with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='1GB'"): - Cluster(memory="1GB") + create_cluster_func(Cluster, memory="1GB") with pytest.raises(ValueError, match=base_regex + r"cores=4, memory='\d+GB'"): - Cluster(cores=4) + create_cluster_func(Cluster, cores=4) @pytest.mark.asyncio @@ -256,54 +272,51 @@ def __init__(self, *args, **kwargs): MyCluster(cores=1, memory="1GB") -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_default_number_of_worker_processes(Cluster): - with Cluster(cores=4, memory="4GB") as cluster: + with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster: assert " --nprocs 4" in cluster.job_script() assert " --nthreads 1" in cluster.job_script() - with Cluster(cores=6, memory="4GB") as cluster: + with create_cluster_func(Cluster, cores=6, memory="4GB") as cluster: assert " --nprocs 3" in cluster.job_script() assert " --nthreads 2" in cluster.job_script() -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options(Cluster): net_if_addrs = psutil.net_if_addrs() interface = list(net_if_addrs.keys())[0] port = 8804 - with Cluster( - cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port} + with create_cluster_func( + Cluster, + cores=1, + memory="1GB", + scheduler_options={"interface": interface, "port": port}, ) as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options["interface"] == interface assert scheduler_options["port"] == port -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options_interface(Cluster): net_if_addrs = psutil.net_if_addrs() scheduler_interface = list(net_if_addrs.keys())[0] worker_interface = "worker-interface" scheduler_host = socket.gethostname() - with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster: + with create_cluster_func( + Cluster, cores=1, memory="1GB", interface=scheduler_interface + ) as cluster: scheduler_options = cluster.scheduler_spec["options"] worker_options = cluster.new_spec["options"] assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == scheduler_interface - with Cluster( + with create_cluster_func( + Cluster, cores=1, memory="1GB", interface=worker_interface, @@ -314,7 +327,8 @@ def test_scheduler_options_interface(Cluster): assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == worker_interface - with Cluster( + with create_cluster_func( + Cluster, cores=1, memory="1GB", interface=worker_interface, @@ -326,29 +340,25 @@ def test_scheduler_options_interface(Cluster): assert worker_options["interface"] == worker_interface -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster): scheduler_host = socket.gethostname() message_template = "pass {!r} through 'scheduler_options'" message = message_template.format("host") with pytest.raises(ValueError, match=message): - with Cluster(cores=1, memory="1GB", host=scheduler_host): + with create_cluster_func(Cluster, cores=1, memory="1GB", host=scheduler_host): pass message = message_template.format("dashboard_address") with pytest.raises(ValueError, match=message): - with Cluster(cores=1, memory="1GB", dashboard_address=":8787"): + with create_cluster_func( + Cluster, cores=1, memory="1GB", dashboard_address=":8787" + ): pass -@pytest.mark.parametrize( - "Cluster", - [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], -) +@pytest.mark.parametrize("Cluster", all_clusters) def test_import_scheduler_options_from_config(Cluster): net_if_addrs = psutil.net_if_addrs() @@ -369,12 +379,13 @@ def test_import_scheduler_options_from_config(Cluster): {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with Cluster(cores=2, memory="2GB") as cluster: + with create_cluster_func(Cluster, cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface assert scheduler_options.get("port") == config_scheduler_port - with Cluster( + with create_cluster_func( + Cluster, cores=2, memory="2GB", scheduler_options={"interface": pass_scheduler_interface},