diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index d2174258..eda0237e 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -140,6 +140,7 @@ jobqueue: job-extra: [] log-directory: null lsf-units: null + use-stdin: null htcondor: name: dask-worker diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 348a356d..f3ad6495 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -1,6 +1,11 @@ +from distutils.version import LooseVersion + import logging import math import os +import re +import subprocess +import toolz import dask @@ -91,6 +96,14 @@ def __init__( logger.debug("Job script: \n %s" % self.job_script()) + async def _submit_job(self, script_filename): + if use_stdin(): + piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] + return self._call(piped_cmd, shell=True) + else: + result = await super()._submit_job(script_filename) + return result + def lsf_format_bytes_ceil(n, lsf_units="mb"): """ Format bytes as text @@ -196,3 +209,17 @@ class LSFCluster(JobQueueCluster): ) job_cls = LSFJob config_name = "lsf" + + +def use_stdin(): + if dask.config.get("jobqueue.lsf.use-stdin") is not None: + return dask.config.get("jobqueue.lsf.use-stdin") + + return lsf_version() < "10" + + +@toolz.memoize +def lsf_version(): + out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate() + version = re.search(r"(\d+\.)+\d+", out.decode()).group() + return LooseVersion(version) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 7915a1bb..a03d953f 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -306,3 +306,9 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit): def test_lsf_unit_detection_without_file(): lsf_unit_detection_helper("kb", conf_text=None) + + +@pytest.mark.parametrize("stdin", [True, False]) +def test_stdin(stdin): + with dask.config.set({"jobqueue.lsf.use-stdin": stdin}): + assert lsf.use_stdin() is stdin