From ee6fb77a64f74619cdb57e5ecbdeef013386c1c7 Mon Sep 17 00:00:00 2001 From: Stuart Berg Date: Wed, 23 Oct 2019 18:19:40 -0400 Subject: [PATCH] lsf: Allow use_stdin to be passed in via the LSFCluster constructor. --- conftest.py | 17 ++++++++++++++ dask_jobqueue/jobqueue.yaml | 2 +- dask_jobqueue/lsf.py | 35 +++++++++++++++++++++-------- dask_jobqueue/tests/test_lsf.py | 39 +++++++++++++++++++++++++++------ 4 files changed, 76 insertions(+), 17 deletions(-) diff --git a/conftest.py b/conftest.py index 6d85416f..751b4b15 100644 --- a/conftest.py +++ b/conftest.py @@ -5,6 +5,8 @@ import pytest +import dask_jobqueue.lsf + def pytest_addoption(parser): parser.addoption( @@ -27,3 +29,18 @@ def pytest_runtest_setup(item): if envnames: if item.config.getoption("-E") not in envnames: pytest.skip("test requires env in %r" % envnames) + + +@pytest.fixture(autouse=True) +def mock_lsf_version(monkeypatch, request): + # Monkey-patch lsf_version() UNLESS the 'lsf' environment is selected. + # In that case, the real lsf_version() function should work. + markers = list(request.node.iter_markers()) + if any("lsf" in marker.args for marker in markers): + return + + try: + dask_jobqueue.lsf.lsf_version() + except OSError: + # Provide a fake implementation of lsf_version() + monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10") diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index eda0237e..9e5e8afc 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -140,7 +140,7 @@ jobqueue: job-extra: [] log-directory: null lsf-units: null - use-stdin: null + use-stdin: null # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh' htcondor: name: dask-worker diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index f3ad6495..1dfda0ff 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -29,6 +29,7 @@ def __init__( job_extra=None, lsf_units=None, config_name="lsf", + use_stdin=None, **kwargs ): if queue is None: @@ -46,6 +47,12 @@ def __init__( if lsf_units is None: lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name) + if use_stdin is None: + use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name) + if use_stdin is None: + use_stdin = lsf_version() < "10" + self.use_stdin = use_stdin + # Instantiate args and parameters from parent abstract class super().__init__(*args, config_name=config_name, **kwargs) @@ -97,7 +104,7 @@ def __init__( logger.debug("Job script: \n %s" % self.job_script()) async def _submit_job(self, script_filename): - if use_stdin(): + if self.use_stdin: piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] return self._call(piped_cmd, shell=True) else: @@ -189,12 +196,29 @@ class LSFCluster(JobQueueCluster): lsf_units : str Unit system for large units in resource usage set by the LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster. + use_stdin : bool + LSF's ``bsub`` command allows us to launch a job by passing it as an + argument (``bsub /tmp/jobscript.sh``) or feeding it to stdin + (``bsub < /tmp/jobscript.sh``). Depending on your cluster's configuration + and/or shared filesystem setup, one of those methods may not work, + forcing you to use the other one. This option controls which method + ``dask-jobqueue`` will use to submit jobs via ``bsub``. + + In particular, if your cluster fails to launch and the LSF log contains + an error message similar to the following: + + .. code-block:: + + /home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory + + ...then try passing ``use_stdin=True`` here or setting ``use-stdin: true`` + in your ``jobqueue.lsf`` config section. Examples -------- >>> from dask_jobqueue import LSFCluster >>> cluster = LSFCluster(queue='general', project='DaskonLSF', - ... cores=15, memory='25GB') + ... cores=15, memory='25GB', use_stdin=True) >>> cluster.scale(jobs=10) # ask for 10 jobs >>> from dask.distributed import Client @@ -211,13 +235,6 @@ class LSFCluster(JobQueueCluster): config_name = "lsf" -def use_stdin(): - if dask.config.get("jobqueue.lsf.use-stdin") is not None: - return dask.config.get("jobqueue.lsf.use-stdin") - - return lsf_version() < "10" - - @toolz.memoize def lsf_version(): out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate() diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index a03d953f..381b0acb 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -217,11 +217,41 @@ def test_adaptive_grouped(loop): def test_config(loop): with dask.config.set( - {"jobqueue.lsf.walltime": "00:02", "jobqueue.lsf.local-directory": "/foo"} + { + "jobqueue.lsf.walltime": "00:02", + "jobqueue.lsf.local-directory": "/foo", + "jobqueue.lsf.use-stdin": True, + } ): with LSFCluster(loop=loop, cores=1, memory="2GB") as cluster: assert "00:02" in cluster.job_script() assert "--local-directory /foo" in cluster.job_script() + assert cluster._dummy_job.use_stdin + + +@pytest.mark.parametrize( + "config_value,constructor_value", + [ + (None, False), + (None, True), + (True, None), + (False, None), + (True, False), # Constuctor overrides config + ], +) +def test_use_stdin(loop, config_value, constructor_value): + """ + Verify that use-stdin is respected when passed via the + config OR the LSFCluster() constructor + """ + with dask.config.set({"jobqueue.lsf.use-stdin": config_value}): + with LSFCluster( + loop=loop, cores=1, memory="2GB", use_stdin=constructor_value + ) as cluster: + if constructor_value is not None: + assert cluster._dummy_job.use_stdin == constructor_value + else: + assert cluster._dummy_job.use_stdin == config_value def test_config_name_lsf_takes_custom_config(): @@ -244,6 +274,7 @@ def test_config_name_lsf_takes_custom_config(): "env-extra": [], "log-directory": None, "shebang": "#!/usr/bin/env bash", + "use-stdin": None, } with dask.config.set({"jobqueue.lsf-config-name": conf}): @@ -306,9 +337,3 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit): def test_lsf_unit_detection_without_file(): lsf_unit_detection_helper("kb", conf_text=None) - - -@pytest.mark.parametrize("stdin", [True, False]) -def test_stdin(stdin): - with dask.config.set({"jobqueue.lsf.use-stdin": stdin}): - assert lsf.use_stdin() is stdin