From 4b302407ab26231cba4981bf49155bd807ae2104 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 4 Oct 2019 09:57:51 -0500 Subject: [PATCH 1/4] Support stdin with LSF bsub --- dask_jobqueue/jobqueue.yaml | 1 + dask_jobqueue/lsf.py | 26 ++++++++++++++++++++++++++ dask_jobqueue/tests/test_lsf.py | 6 ++++++ 3 files changed, 33 insertions(+) diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index d2174258..eda0237e 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -140,6 +140,7 @@ jobqueue: job-extra: [] log-directory: null lsf-units: null + use-stdin: null htcondor: name: dask-worker diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 348a356d..c7301aff 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -1,6 +1,11 @@ +from distutils.version import LooseVersion + import logging import math import os +import re +import subprocess +import toolz import dask @@ -91,6 +96,13 @@ def __init__( logger.debug("Job script: \n %s" % self.job_script()) + async def _submit_job(self, script_filename): + if use_stdin(): + piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] + return self._call(piped_cmd, shell=True) + else: + return super()._submit_job(script_filename) + def lsf_format_bytes_ceil(n, lsf_units="mb"): """ Format bytes as text @@ -196,3 +208,17 @@ class LSFCluster(JobQueueCluster): ) job_cls = LSFJob config_name = "lsf" + + +def use_stdin(): + if dask.config.get("jobqueue.lsf.use-stdin") is not None: + return dask.config.get("jobqueue.lsf.use-stdin") + + return lsf_version() <= "9" + + +@toolz.memoize +def lsf_version(): + out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate() + version = re.search("(\d\.)+\d+", out) + return LooseVersion(version) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 7915a1bb..a03d953f 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -306,3 +306,9 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit): def test_lsf_unit_detection_without_file(): lsf_unit_detection_helper("kb", conf_text=None) + + +@pytest.mark.parametrize("stdin", [True, False]) +def test_stdin(stdin): + with dask.config.set({"jobqueue.lsf.use-stdin": stdin}): + assert lsf.use_stdin() is stdin From 4b8af81003db0a57df3f3821f7a8aa01807f4c04 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 4 Oct 2019 12:20:49 -0500 Subject: [PATCH 2/4] use raw string --- dask_jobqueue/lsf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index c7301aff..58e361ce 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -220,5 +220,5 @@ def use_stdin(): @toolz.memoize def lsf_version(): out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate() - version = re.search("(\d\.)+\d+", out) + version = re.search(r"(\d\.)+\d+", out) return LooseVersion(version) From 1091ddbc1706ff3c942ae1c8d9f59661ef634600 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 5 Oct 2019 10:39:01 -0500 Subject: [PATCH 3/4] Handle string and re group properly --- dask_jobqueue/lsf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 58e361ce..dd853799 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -214,11 +214,11 @@ def use_stdin(): if dask.config.get("jobqueue.lsf.use-stdin") is not None: return dask.config.get("jobqueue.lsf.use-stdin") - return lsf_version() <= "9" + return lsf_version() < "10" @toolz.memoize def lsf_version(): out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate() - version = re.search(r"(\d\.)+\d+", out) + version = re.search(r"(\d+\.)+\d+", out.decode()).group() return LooseVersion(version) From 25e71c0b8218e393ab88fea46860aadbf6db9b01 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 5 Oct 2019 10:49:32 -0500 Subject: [PATCH 4/4] await submit_job --- dask_jobqueue/lsf.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index dd853799..f3ad6495 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -101,7 +101,8 @@ async def _submit_job(self, script_filename): piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] return self._call(piped_cmd, shell=True) else: - return super()._submit_job(script_filename) + result = await super()._submit_job(script_filename) + return result def lsf_format_bytes_ceil(n, lsf_units="mb"):