Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ jobqueue:
job-extra: []
log-directory: null
lsf-units: null
use-stdin: null

htcondor:
name: dask-worker
Expand Down
27 changes: 27 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from distutils.version import LooseVersion

import logging
import math
import os
import re
import subprocess
import toolz

import dask

Expand Down Expand Up @@ -91,6 +96,14 @@ def __init__(

logger.debug("Job script: \n %s" % self.job_script())

async def _submit_job(self, script_filename):
if use_stdin():
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)
else:
result = await super()._submit_job(script_filename)
return result


def lsf_format_bytes_ceil(n, lsf_units="mb"):
""" Format bytes as text
Expand Down Expand Up @@ -196,3 +209,17 @@ class LSFCluster(JobQueueCluster):
)
job_cls = LSFJob
config_name = "lsf"


def use_stdin():
if dask.config.get("jobqueue.lsf.use-stdin") is not None:
return dask.config.get("jobqueue.lsf.use-stdin")

return lsf_version() < "10"


@toolz.memoize
def lsf_version():
out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate()
version = re.search(r"(\d+\.)+\d+", out.decode()).group()
return LooseVersion(version)
6 changes: 6 additions & 0 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,9 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit):

def test_lsf_unit_detection_without_file():
lsf_unit_detection_helper("kb", conf_text=None)


@pytest.mark.parametrize("stdin", [True, False])
def test_stdin(stdin):
with dask.config.set({"jobqueue.lsf.use-stdin": stdin}):
assert lsf.use_stdin() is stdin