From c43a0bf0bcccfe177b02fffe89b54748ebf408a8 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 31 Oct 2021 16:49:29 +0100 Subject: [PATCH 1/5] rewrite testing using fixtures xfail security align LocalCluster api for unified testing move job unrelated test functions from test_job and to test_jobqueue_core for now --- conftest.py | 63 ++++++++++++++++-- dask_jobqueue/core.py | 3 +- dask_jobqueue/jobqueue.yaml | 1 + dask_jobqueue/local.py | 10 +-- dask_jobqueue/tests/test_job.py | 25 ------- dask_jobqueue/tests/test_jobqueue_core.py | 80 ++++++++--------------- 6 files changed, 93 insertions(+), 89 deletions(-) diff --git a/conftest.py b/conftest.py index a191a990..ce3e8e9b 100644 --- a/conftest.py +++ b/conftest.py @@ -7,6 +7,18 @@ import dask_jobqueue.lsf +from dask_jobqueue import ( + PBSCluster, + MoabCluster, + SLURMCluster, + SGECluster, + LSFCluster, + OARCluster, + HTCondorCluster, +) + +from dask_jobqueue.local import LocalCluster + def pytest_addoption(parser): parser.addoption( @@ -22,15 +34,29 @@ def pytest_configure(config): config.addinivalue_line( "markers", "env(NAME): only run test if environment NAME matches" ) + config.addinivalue_line("markers", "xfail_ci(NAME): know CI failure") def pytest_runtest_setup(item): - envnames = [mark.args[0] for mark in item.iter_markers(name="env")] - if (item.config.getoption("-E") is None and envnames) or ( - item.config.getoption("-E") is not None - and item.config.getoption("-E") not in envnames + env = item.config.getoption("-E") + envnames = sum( + [ + mark.args[0] if isinstance(mark.args[0], list) else [mark.args[0]] + for mark in item.iter_markers(name="env") + ], + [], + ) + if ( + None not in envnames + and (env is None and envnames) + or (env is not None and env not in envnames) ): pytest.skip("test requires env in %r" % envnames) + else: + xfail = {} + [xfail.update(mark.args[0]) for mark in item.iter_markers(name="xfail_ci")] + if env in xfail: + item.add_marker(pytest.mark.xfail(reason=xfail[env])) @pytest.fixture(autouse=True) @@ -46,3 +72,32 @@ def mock_lsf_version(monkeypatch, request): except OSError: # Provide a fake implementation of lsf_version() monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10") + + +all_envs = { + None: LocalCluster, + "pbs": PBSCluster, + "moab": MoabCluster, + "slurm": SLURMCluster, + "sge": SGECluster, + "lsf": LSFCluster, + "oar": OARCluster, + "htcondor": HTCondorCluster, +} + + +@pytest.fixture +def EnvSpecificCluster3(pytestconfig): + return all_envs[pytestconfig.getoption("-E")] + + +@pytest.fixture( + params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()] +) +def EnvSpecificCluster(request): + return request.param + + +@pytest.fixture(params=list(all_envs.values())) +def Cluster(request): + return request.param diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 7a973882..09fcf61f 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -123,7 +123,7 @@ class Job(ProcessInterface, abc.ABC): %(job_header)s %(env_header)s -%(worker_command)s +exec %(worker_command)s """.lstrip() # Following class attributes should be overridden by extending classes. @@ -304,6 +304,7 @@ def job_file(self): with open(fn, "w") as f: logger.debug("writing job script: \n%s", self.job_script()) f.write(self.job_script()) + os.chmod(fn, 0o700) yield fn async def _submit_job(self, script_filename): diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index d4a76546..5bff5d44 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -195,6 +195,7 @@ jobqueue: death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR extra: [] + shebang: "#!/usr/bin/env bash" env-extra: [] job-extra: [] diff --git a/dask_jobqueue/local.py b/dask_jobqueue/local.py index cd87ce52..ce9202a6 100644 --- a/dask_jobqueue/local.py +++ b/dask_jobqueue/local.py @@ -38,11 +38,7 @@ def __init__( ): # Instantiate args and parameters from parent abstract class super().__init__( - scheduler=scheduler, - name=name, - config_name=config_name, - shebang="", - **kwargs + scheduler=scheduler, name=name, config_name=config_name, **kwargs ) # Declare class attribute that shall be overridden @@ -52,10 +48,8 @@ def __init__( async def _submit_job(self, script_filename): # Should we make this async friendly? - with open(script_filename) as f: - text = f.read().strip().split() self.process = Subprocess( - text, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM + script_filename, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM ) lines = [] diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 1a2a00a7..68f28f87 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -1,15 +1,6 @@ import asyncio from time import time -from dask_jobqueue import ( - PBSCluster, - SGECluster, - SLURMCluster, - LSFCluster, - HTCondorCluster, - MoabCluster, - OARCluster, -) from dask_jobqueue.local import LocalJob, LocalCluster from dask_jobqueue.pbs import PBSJob from dask_jobqueue.sge import SGEJob @@ -41,16 +32,6 @@ def test_basic(): all_jobs = [SGEJob, PBSJob, SLURMJob, LSFJob, HTCondorJob, MoabJob, OARJob] -all_clusters = [ - SGECluster, - PBSCluster, - SLURMCluster, - LSFCluster, - HTCondorCluster, - MoabCluster, - OARCluster, - HTCondorCluster, -] @pytest.mark.parametrize("job_cls", job_protected) @@ -174,9 +155,3 @@ async def test_nprocs_scale(): assert len(cluster.worker_spec) == 3 cluster.scale(1) assert len(cluster.worker_spec) == 1 - - -@pytest.mark.parametrize("Cluster", all_clusters) -def test_docstring_cluster(Cluster): - assert "cores :" in Cluster.__doc__ - assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index b616392c..67df9bf0 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -15,11 +15,6 @@ from dask_jobqueue import ( JobQueueCluster, PBSCluster, - MoabCluster, - SLURMCluster, - SGECluster, - LSFCluster, - OARCluster, HTCondorCluster, ) from dask_jobqueue.core import Job @@ -27,15 +22,7 @@ from dask_jobqueue.sge import SGEJob -all_clusters = [ - PBSCluster, - MoabCluster, - SLURMCluster, - SGECluster, - LSFCluster, - OARCluster, - HTCondorCluster, -] +# all_clusters = list(all_envs.values()) def create_cluster_func(cluster_cls, **kwargs): @@ -51,8 +38,8 @@ def test_errors(): JobQueueCluster(cores=4) -def test_command_template(): - with PBSCluster(cores=2, memory="4GB") as cluster: +def test_command_template(Cluster): + with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster: assert ( "%s -m distributed.cli.dask_worker" % (sys.executable) in cluster._dummy_job._command_template @@ -61,7 +48,8 @@ def test_command_template(): assert " --memory-limit " in cluster._dummy_job._command_template assert " --name " in cluster._dummy_job._command_template - with PBSCluster( + with create_cluster_func( + Cluster, cores=2, memory="4GB", death_timeout=60, @@ -73,7 +61,6 @@ def test_command_template(): assert " --preload mymodule" in cluster._dummy_job._command_template -@pytest.mark.parametrize("Cluster", all_clusters) def test_shebang_settings(Cluster): if Cluster is HTCondorCluster: pytest.skip( @@ -92,13 +79,12 @@ def test_shebang_settings(Cluster): assert job_script.startswith(default_shebang) -@pytest.mark.parametrize("Cluster", all_clusters) def test_dashboard_link(Cluster): with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link) -def test_forward_ip(): +def test_forward_ip(Cluster): ip = "127.0.0.1" with PBSCluster( walltime="00:02:00", @@ -117,25 +103,6 @@ def test_forward_ip(): assert cluster.scheduler.ip == default_ip -@pytest.mark.parametrize("Cluster", []) -@pytest.mark.parametrize( - "qsub_return_string", - [ - "{job_id}.admin01", - "Request {job_id}.asdf was sumbitted to queue: standard.", - "sbatch: Submitted batch job {job_id}", - "{job_id};cluster", - "Job <{job_id}> is submitted to default queue .", - "{job_id}", - ], -) -def test_job_id_from_qsub_legacy(Cluster, qsub_return_string): - original_job_id = "654321" - qsub_return_string = qsub_return_string.format(job_id=original_job_id) - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: - assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string) - - @pytest.mark.parametrize("job_cls", [SGEJob]) @pytest.mark.parametrize( "qsub_return_string", @@ -221,7 +188,6 @@ def test_jobqueue_cluster_call(tmpdir): cluster._call([sys.executable, path_with_error.strpath]) -@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_has_cores_and_memory(Cluster): base_regex = r"{}.+".format(Cluster.__name__) with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"): @@ -275,7 +241,6 @@ def __init__(self, *args, **kwargs): MyCluster(cores=1, memory="1GB") -@pytest.mark.parametrize("Cluster", all_clusters) def test_default_number_of_worker_processes(Cluster): with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster: assert " --nprocs 4" in cluster.job_script() @@ -286,7 +251,6 @@ def test_default_number_of_worker_processes(Cluster): assert " --nthreads 2" in cluster.job_script() -@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options(Cluster): net_if_addrs = psutil.net_if_addrs() interface = list(net_if_addrs.keys())[0] @@ -303,7 +267,6 @@ def test_scheduler_options(Cluster): assert scheduler_options["port"] == port -@pytest.mark.parametrize("Cluster", all_clusters) def test_scheduler_options_interface(Cluster): net_if_addrs = psutil.net_if_addrs() scheduler_interface = list(net_if_addrs.keys())[0] @@ -343,7 +306,6 @@ def test_scheduler_options_interface(Cluster): assert worker_options["interface"] == worker_interface -@pytest.mark.parametrize("Cluster", all_clusters) def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster): scheduler_host = socket.gethostname() message_template = "pass {!r} through 'scheduler_options'" @@ -361,7 +323,6 @@ def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster) pass -@pytest.mark.parametrize("Cluster", all_clusters) def test_import_scheduler_options_from_config(Cluster): net_if_addrs = psutil.net_if_addrs() @@ -398,7 +359,6 @@ def test_import_scheduler_options_from_config(Cluster): assert scheduler_options.get("port") is None -@pytest.mark.parametrize("Cluster", all_clusters) def test_wrong_parameter_error(Cluster): match = re.compile( "unexpected keyword argument.+wrong_parameter.+" @@ -412,7 +372,9 @@ def test_wrong_parameter_error(Cluster): ) -def test_security(): +@pytest.mark.xfail_ci({"htcondor": "no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_ci({"slurm": "no shared filesystem in slurm ci"}) +def test_security(EnvSpecificCluster, loop): dirname = os.path.dirname(__file__) key = os.path.join(dirname, "key.pem") cert = os.path.join(dirname, "ca.pem") @@ -427,8 +389,13 @@ def test_security(): require_encryption=True, ) - with LocalCluster( - cores=1, memory="1GB", security=security, protocol="tls" + with create_cluster_func( + EnvSpecificCluster, + cores=1, + memory="100MB", + security=security, + protocol="tls", + loop=loop, ) as cluster: assert cluster.security == security assert cluster.scheduler_spec["options"]["security"] == security @@ -439,10 +406,21 @@ def test_security(): assert "--tls-ca-file {}".format(cert) in job_script cluster.scale(jobs=1) + with Client(cluster, security=security) as client: future = client.submit(lambda x: x + 1, 10) - result = future.result() + result = future.result(timeout=30) assert result == 11 - with LocalCluster(cores=1, memory="1GB", security=security) as cluster: + with create_cluster_func( + EnvSpecificCluster, + cores=1, + memory="100MB", + security=security, + ) as cluster: assert "tls://" in job_script + + +def test_docstring_cluster(Cluster): + assert "cores :" in Cluster.__doc__ + assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ From 827a687cb47380c49599302338b22cf68d8b4d1d Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 7 Nov 2021 13:54:43 +0100 Subject: [PATCH 2/5] move more to fixture move htcondor defaults to fixture revert move of docstring check (to make diff minimal) test_job now uses fixture --- conftest.py | 5 ++ dask_jobqueue/tests/test_job.py | 47 +++++--------- dask_jobqueue/tests/test_jobqueue_core.py | 75 +++++++---------------- 3 files changed, 45 insertions(+), 82 deletions(-) diff --git a/conftest.py b/conftest.py index ce3e8e9b..1a87ce13 100644 --- a/conftest.py +++ b/conftest.py @@ -6,6 +6,7 @@ import pytest import dask_jobqueue.lsf +import dask from dask_jobqueue import ( PBSCluster, @@ -95,9 +96,13 @@ def EnvSpecificCluster3(pytestconfig): params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()] ) def EnvSpecificCluster(request): + if request.param == HTCondorCluster: + dask.config.set({"jobqueue.htcondor.disk": "20MB"}) return request.param @pytest.fixture(params=list(all_envs.values())) def Cluster(request): + if request.param == HTCondorCluster: + dask.config.set({"jobqueue.htcondor.disk": "1GB"}) return request.param diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 68f28f87..fa94febd 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -1,15 +1,8 @@ import asyncio from time import time -from dask_jobqueue.local import LocalJob, LocalCluster +from dask_jobqueue.local import LocalCluster from dask_jobqueue.pbs import PBSJob -from dask_jobqueue.sge import SGEJob -from dask_jobqueue.slurm import SLURMJob -from dask_jobqueue.lsf import LSFJob -from dask_jobqueue.moab import MoabJob -from dask_jobqueue.htcondor import HTCondorJob -from dask_jobqueue.oar import OARJob - from dask_jobqueue.core import JobQueueCluster from dask.distributed import Scheduler, Client from distributed.core import Status @@ -17,26 +10,15 @@ import pytest -def test_basic(): - job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") +def test_basic(Cluster): + job_cls = Cluster.job_cls + job = job_cls(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") assert "127.0.0.1:12345" in job.job_script() -job_protected = [ - pytest.param(SGEJob, marks=[pytest.mark.env("sge")]), - pytest.param(PBSJob, marks=[pytest.mark.env("pbs")]), - pytest.param(SLURMJob, marks=[pytest.mark.env("slurm")]), - pytest.param(LSFJob, marks=[pytest.mark.env("lsf")]), - LocalJob, -] - - -all_jobs = [SGEJob, PBSJob, SLURMJob, LSFJob, HTCondorJob, MoabJob, OARJob] - - -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_job(job_cls): +async def test_job(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with Scheduler(port=0) as s: job = job_cls(scheduler=s.address, name="foo", cores=1, memory="1GB") job = await job @@ -52,9 +34,9 @@ async def test_job(job_cls): assert time() < start + 10 -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_cluster(job_cls): +async def test_cluster(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" ) as cluster: @@ -75,9 +57,9 @@ async def test_cluster(job_cls): assert time() < start + 10 -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_adapt(job_cls): +async def test_adapt(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" ) as cluster: @@ -105,9 +87,9 @@ async def test_adapt(job_cls): assert not cluster.workers -@pytest.mark.parametrize("job_cls", job_protected) @pytest.mark.asyncio -async def test_adapt_parameters(job_cls): +async def test_adapt_parameters(EnvSpecificCluster): + job_cls = EnvSpecificCluster.job_cls async with JobQueueCluster( cores=2, memory="1GB", processes=2, job_cls=job_cls, asynchronous=True ) as cluster: @@ -155,3 +137,8 @@ async def test_nprocs_scale(): assert len(cluster.worker_spec) == 3 cluster.scale(1) assert len(cluster.worker_spec) == 1 + + +def test_docstring_cluster(Cluster): + assert "cores :" in Cluster.__doc__ + assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 67df9bf0..bc4e66e5 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -22,15 +22,6 @@ from dask_jobqueue.sge import SGEJob -# all_clusters = list(all_envs.values()) - - -def create_cluster_func(cluster_cls, **kwargs): - if cluster_cls is HTCondorCluster: - # HTCondorCluster has a mandatory disk argument - kwargs.setdefault("disk", "1GB") - return cluster_cls(**kwargs) - def test_errors(): match = re.compile("Job type.*job_cls", flags=re.DOTALL) @@ -39,7 +30,7 @@ def test_errors(): def test_command_template(Cluster): - with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster: + with Cluster(cores=2, memory="4GB") as cluster: assert ( "%s -m distributed.cli.dask_worker" % (sys.executable) in cluster._dummy_job._command_template @@ -48,8 +39,7 @@ def test_command_template(Cluster): assert " --memory-limit " in cluster._dummy_job._command_template assert " --name " in cluster._dummy_job._command_template - with create_cluster_func( - Cluster, + with Cluster( cores=2, memory="4GB", death_timeout=60, @@ -68,19 +58,17 @@ def test_shebang_settings(Cluster): ) default_shebang = "#!/usr/bin/env bash" python_shebang = "#!/usr/bin/python" - with create_cluster_func( - Cluster, cores=2, memory="4GB", shebang=python_shebang - ) as cluster: + with Cluster(cores=2, memory="4GB", shebang=python_shebang) as cluster: job_script = cluster.job_script() assert job_script.startswith(python_shebang) assert "bash" not in job_script - with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster: + with Cluster(cores=2, memory="4GB") as cluster: job_script = cluster.job_script() assert job_script.startswith(default_shebang) def test_dashboard_link(Cluster): - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link) @@ -125,13 +113,13 @@ def test_job_id_from_qsub(job_cls, qsub_return_string): @pytest.mark.parametrize("Cluster", []) def test_job_id_error_handling_legacy(Cluster): # non-matching regexp - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="Could not parse job id"): return_string = "there is no number here" cluster._job_id_from_submit_output(return_string) # no job_id named group in the regexp - with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster: + with Cluster(cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="You need to use a 'job_id' named group"): return_string = "Job <12345> submitted to ." cluster.job_id_regexp = r"(\d+)" @@ -191,13 +179,13 @@ def test_jobqueue_cluster_call(tmpdir): def test_cluster_has_cores_and_memory(Cluster): base_regex = r"{}.+".format(Cluster.__name__) with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"): - create_cluster_func(Cluster) + Cluster() with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='1GB'"): - create_cluster_func(Cluster, memory="1GB") + Cluster(memory="1GB") with pytest.raises(ValueError, match=base_regex + r"cores=4, memory='\d+GB'"): - create_cluster_func(Cluster, cores=4) + Cluster(cores=4) @pytest.mark.asyncio @@ -242,11 +230,11 @@ def __init__(self, *args, **kwargs): def test_default_number_of_worker_processes(Cluster): - with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster: + with Cluster(cores=4, memory="4GB") as cluster: assert " --nprocs 4" in cluster.job_script() assert " --nthreads 1" in cluster.job_script() - with create_cluster_func(Cluster, cores=6, memory="4GB") as cluster: + with Cluster(cores=6, memory="4GB") as cluster: assert " --nprocs 3" in cluster.job_script() assert " --nthreads 2" in cluster.job_script() @@ -256,8 +244,7 @@ def test_scheduler_options(Cluster): interface = list(net_if_addrs.keys())[0] port = 8804 - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}, @@ -273,16 +260,13 @@ def test_scheduler_options_interface(Cluster): worker_interface = "worker-interface" scheduler_host = socket.gethostname() - with create_cluster_func( - Cluster, cores=1, memory="1GB", interface=scheduler_interface - ) as cluster: + with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster: scheduler_options = cluster.scheduler_spec["options"] worker_options = cluster.new_spec["options"] assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == scheduler_interface - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", interface=worker_interface, @@ -293,8 +277,7 @@ def test_scheduler_options_interface(Cluster): assert scheduler_options["interface"] == scheduler_interface assert worker_options["interface"] == worker_interface - with create_cluster_func( - Cluster, + with Cluster( cores=1, memory="1GB", interface=worker_interface, @@ -312,14 +295,12 @@ def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster) message = message_template.format("host") with pytest.raises(ValueError, match=message): - with create_cluster_func(Cluster, cores=1, memory="1GB", host=scheduler_host): + with Cluster(cores=1, memory="1GB", host=scheduler_host): pass message = message_template.format("dashboard_address") with pytest.raises(ValueError, match=message): - with create_cluster_func( - Cluster, cores=1, memory="1GB", dashboard_address=":8787" - ): + with Cluster(Cluster, cores=1, memory="1GB", dashboard_address=":8787"): pass @@ -343,13 +324,12 @@ def test_import_scheduler_options_from_config(Cluster): {"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options} ): - with create_cluster_func(Cluster, cores=2, memory="2GB") as cluster: + with Cluster(cores=2, memory="2GB") as cluster: scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == config_scheduler_interface assert scheduler_options.get("port") == config_scheduler_port - with create_cluster_func( - Cluster, + with Cluster( cores=2, memory="2GB", scheduler_options={"interface": pass_scheduler_interface}, @@ -367,9 +347,7 @@ def test_wrong_parameter_error(Cluster): re.DOTALL, ) with pytest.raises(ValueError, match=match): - create_cluster_func( - Cluster, cores=1, memory="1GB", wrong_parameter="wrong_parameter_value" - ) + Cluster(cores=1, memory="1GB", wrong_parameter="wrong_parameter_value") @pytest.mark.xfail_ci({"htcondor": "no shared filesystem in htcondor ci"}) @@ -389,8 +367,7 @@ def test_security(EnvSpecificCluster, loop): require_encryption=True, ) - with create_cluster_func( - EnvSpecificCluster, + with EnvSpecificCluster( cores=1, memory="100MB", security=security, @@ -412,15 +389,9 @@ def test_security(EnvSpecificCluster, loop): result = future.result(timeout=30) assert result == 11 - with create_cluster_func( - EnvSpecificCluster, + with EnvSpecificCluster( cores=1, memory="100MB", security=security, ) as cluster: assert "tls://" in job_script - - -def test_docstring_cluster(Cluster): - assert "cores :" in Cluster.__doc__ - assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ From 76560841993448c9fe5c462e5ba7017c492f1ec6 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Tue, 16 Nov 2021 01:26:10 +0100 Subject: [PATCH 3/5] revert unrelated removal add link to issue in xfail --- dask_jobqueue/tests/test_jobqueue_core.py | 41 ++++++++++++++++------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index bc4e66e5..c60e7c9f 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -14,7 +14,6 @@ from dask_jobqueue import ( JobQueueCluster, - PBSCluster, HTCondorCluster, ) from dask_jobqueue.core import Job @@ -74,8 +73,7 @@ def test_dashboard_link(Cluster): def test_forward_ip(Cluster): ip = "127.0.0.1" - with PBSCluster( - walltime="00:02:00", + with Cluster( processes=4, cores=8, memory="28GB", @@ -85,12 +83,29 @@ def test_forward_ip(Cluster): assert cluster.scheduler.ip == ip default_ip = socket.gethostbyname("") - with PBSCluster( - walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker" - ) as cluster: + with Cluster(processes=4, cores=8, memory="28GB", name="dask-worker") as cluster: assert cluster.scheduler.ip == default_ip +@pytest.mark.parametrize("Cluster", []) +@pytest.mark.parametrize( + "qsub_return_string", + [ + "{job_id}.admin01", + "Request {job_id}.asdf was sumbitted to queue: standard.", + "sbatch: Submitted batch job {job_id}", + "{job_id};cluster", + "Job <{job_id}> is submitted to default queue .", + "{job_id}", + ], +) +def test_job_id_from_qsub_legacy(Cluster, qsub_return_string): + original_job_id = "654321" + qsub_return_string = qsub_return_string.format(job_id=original_job_id) + with Cluster(cores=1, memory="1GB") as cluster: + assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string) + + @pytest.mark.parametrize("job_cls", [SGEJob]) @pytest.mark.parametrize( "qsub_return_string", @@ -142,18 +157,18 @@ def test_job_id_error_handling(job_cls): job._job_id_from_submit_output(return_string) -def test_log_directory(tmpdir): +def test_log_directory(Cluster, tmpdir): shutil.rmtree(tmpdir.strpath, ignore_errors=True) - with PBSCluster(cores=1, memory="1GB"): + with Cluster(cores=1, memory="1GB"): assert not os.path.exists(tmpdir.strpath) - with PBSCluster(cores=1, memory="1GB", log_directory=tmpdir.strpath): + with Cluster(cores=1, memory="1GB", log_directory=tmpdir.strpath): assert os.path.exists(tmpdir.strpath) @pytest.mark.skip -def test_jobqueue_cluster_call(tmpdir): - cluster = PBSCluster(cores=1, memory="1GB") +def test_jobqueue_cluster_call(tmpdir, Cluster): + cluster = Cluster(cores=1, memory="1GB") path = tmpdir.join("test.py") path.write('print("this is the stdout")') @@ -350,8 +365,8 @@ def test_wrong_parameter_error(Cluster): Cluster(cores=1, memory="1GB", wrong_parameter="wrong_parameter_value") -@pytest.mark.xfail_ci({"htcondor": "no shared filesystem in htcondor ci"}) -@pytest.mark.xfail_ci({"slurm": "no shared filesystem in slurm ci"}) +@pytest.mark.xfail_ci({"htcondor": "#535 no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_ci({"slurm": "#535 no shared filesystem in slurm ci"}) def test_security(EnvSpecificCluster, loop): dirname = os.path.dirname(__file__) key = os.path.join(dirname, "key.pem") From 8627a23498a764bd1111d731d7a7cd81db7bc78d Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Mon, 22 Nov 2021 23:09:45 +0100 Subject: [PATCH 4/5] revert changes to local.py and core.py document fixtures remove leftovers and inconsistencies rename xfail_ci to xfail_env for consistency --- conftest.py | 17 ++++++++--------- dask_jobqueue/core.py | 3 +-- dask_jobqueue/jobqueue.yaml | 1 - dask_jobqueue/local.py | 10 ++++++++-- dask_jobqueue/tests/test_jobqueue_core.py | 15 +++++++++------ 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/conftest.py b/conftest.py index 1a87ce13..7e8f176d 100644 --- a/conftest.py +++ b/conftest.py @@ -31,11 +31,13 @@ def pytest_addoption(parser): def pytest_configure(config): - # register an additional marker + # register additional markers config.addinivalue_line( "markers", "env(NAME): only run test if environment NAME matches" ) - config.addinivalue_line("markers", "xfail_ci(NAME): know CI failure") + config.addinivalue_line( + "markers", "xfail_env(NAME): known failure for environment NAME" + ) def pytest_runtest_setup(item): @@ -55,7 +57,7 @@ def pytest_runtest_setup(item): pytest.skip("test requires env in %r" % envnames) else: xfail = {} - [xfail.update(mark.args[0]) for mark in item.iter_markers(name="xfail_ci")] + [xfail.update(mark.args[0]) for mark in item.iter_markers(name="xfail_env")] if env in xfail: item.add_marker(pytest.mark.xfail(reason=xfail[env])) @@ -87,22 +89,19 @@ def mock_lsf_version(monkeypatch, request): } -@pytest.fixture -def EnvSpecificCluster3(pytestconfig): - return all_envs[pytestconfig.getoption("-E")] - - @pytest.fixture( params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()] ) def EnvSpecificCluster(request): + """Run test in only with the specific cluster class set by the environment""" if request.param == HTCondorCluster: - dask.config.set({"jobqueue.htcondor.disk": "20MB"}) + dask.config.set({"jobqueue.htcondor.disk": "1GB"}) return request.param @pytest.fixture(params=list(all_envs.values())) def Cluster(request): + """Run for each cluster class independent when no environment is set""" if request.param == HTCondorCluster: dask.config.set({"jobqueue.htcondor.disk": "1GB"}) return request.param diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 09fcf61f..7a973882 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -123,7 +123,7 @@ class Job(ProcessInterface, abc.ABC): %(job_header)s %(env_header)s -exec %(worker_command)s +%(worker_command)s """.lstrip() # Following class attributes should be overridden by extending classes. @@ -304,7 +304,6 @@ def job_file(self): with open(fn, "w") as f: logger.debug("writing job script: \n%s", self.job_script()) f.write(self.job_script()) - os.chmod(fn, 0o700) yield fn async def _submit_job(self, script_filename): diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index 5bff5d44..d4a76546 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -195,7 +195,6 @@ jobqueue: death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR extra: [] - shebang: "#!/usr/bin/env bash" env-extra: [] job-extra: [] diff --git a/dask_jobqueue/local.py b/dask_jobqueue/local.py index ce9202a6..cd87ce52 100644 --- a/dask_jobqueue/local.py +++ b/dask_jobqueue/local.py @@ -38,7 +38,11 @@ def __init__( ): # Instantiate args and parameters from parent abstract class super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, + name=name, + config_name=config_name, + shebang="", + **kwargs ) # Declare class attribute that shall be overridden @@ -48,8 +52,10 @@ def __init__( async def _submit_job(self, script_filename): # Should we make this async friendly? + with open(script_filename) as f: + text = f.read().strip().split() self.process = Subprocess( - script_filename, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM + text, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM ) lines = [] diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index c60e7c9f..175d9438 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -50,10 +50,13 @@ def test_command_template(Cluster): assert " --preload mymodule" in cluster._dummy_job._command_template -def test_shebang_settings(Cluster): - if Cluster is HTCondorCluster: - pytest.skip( - "HTCondorCluster has a peculiar submit script and does not have a shebang" +def test_shebang_settings(Cluster, request): + if Cluster is HTCondorCluster or Cluster is LocalCluster: + request.node.add_marker( + pytest.mark.xfail( + reason="%s has a peculiar submit script and does not have a shebang" + % type(Cluster).__name__ + ) ) default_shebang = "#!/usr/bin/env bash" python_shebang = "#!/usr/bin/python" @@ -365,8 +368,8 @@ def test_wrong_parameter_error(Cluster): Cluster(cores=1, memory="1GB", wrong_parameter="wrong_parameter_value") -@pytest.mark.xfail_ci({"htcondor": "#535 no shared filesystem in htcondor ci"}) -@pytest.mark.xfail_ci({"slurm": "#535 no shared filesystem in slurm ci"}) +@pytest.mark.xfail_env({"htcondor": "#535 no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"}) def test_security(EnvSpecificCluster, loop): dirname = os.path.dirname(__file__) key = os.path.join(dirname, "key.pem") From e5b8f718bbb2ed5a870c2fa41d9f0e7720a6bf17 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Mon, 22 Nov 2021 23:40:05 +0100 Subject: [PATCH 5/5] fix typos in docstring --- conftest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/conftest.py b/conftest.py index 7e8f176d..34c44d8d 100644 --- a/conftest.py +++ b/conftest.py @@ -93,15 +93,17 @@ def mock_lsf_version(monkeypatch, request): params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()] ) def EnvSpecificCluster(request): - """Run test in only with the specific cluster class set by the environment""" + """Run test only with the specific cluster class set by the environment""" if request.param == HTCondorCluster: + # HTCondor requires explicitly specifying requested disk space dask.config.set({"jobqueue.htcondor.disk": "1GB"}) return request.param @pytest.fixture(params=list(all_envs.values())) def Cluster(request): - """Run for each cluster class independent when no environment is set""" + """Run test for each cluster class when no environment is set (test should not require the actual scheduler)""" if request.param == HTCondorCluster: + # HTCondor requires explicitly specifying requested disk space dask.config.set({"jobqueue.htcondor.disk": "1GB"}) return request.param