Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import pytest

import dask_jobqueue.lsf


def pytest_addoption(parser):
parser.addoption(
Expand All @@ -27,3 +29,18 @@ def pytest_runtest_setup(item):
if envnames:
if item.config.getoption("-E") not in envnames:
pytest.skip("test requires env in %r" % envnames)


@pytest.fixture(autouse=True)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to not use autouse here, so that the fixture is explicitly used in test_use_stdin? My preference would be to avoid pytest magic if possible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since lsf_version() is called by default (unless use-stdin is specified in the config), then this monkey-patch is needed by all tests that instantiate LSFCluster(). That includes every test in test_lsf.py and also half of the tests in test_jobqueue_core.py.

I'm not a pytest expert, but IIUC, we need to use autouse=True or we need to add this fixture to every test that needs it in those two files. Is there some better mechanism I'm missing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, in the future, if we simply use use-stdin: true by default, then we can forbid use-stdin: null. At that point, there will be no need for lsf_version() anyway. We can delete it, along with this test fixture.

In other words, it's probably not worth debating the technical details of this fixture if we're going to delete it soon, anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I missed that. I think we can keep it like this for this PR.

When we switch to use_stdin=True, we should remove the lsid logic (and so this autouse fixture). Basically, we thought there was a change in behaviour linked to LSF 10 and my current understanding is that this is not the case but is linked to some quirks on Summit ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like our messages crossed, oh well ... looks like we agree anyway.

def mock_lsf_version(monkeypatch, request):
# Monkey-patch lsf_version() UNLESS the 'lsf' environment is selected.
# In that case, the real lsf_version() function should work.
markers = list(request.node.iter_markers())
if any("lsf" in marker.args for marker in markers):
return

try:
dask_jobqueue.lsf.lsf_version()
except OSError:
# Provide a fake implementation of lsf_version()
monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10")
2 changes: 1 addition & 1 deletion dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobqueue:
job-extra: []
log-directory: null
lsf-units: null
use-stdin: null
use-stdin: null # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh'

htcondor:
name: dask-worker
Expand Down
35 changes: 26 additions & 9 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
job_extra=None,
lsf_units=None,
config_name="lsf",
use_stdin=None,
**kwargs
):
if queue is None:
Expand All @@ -46,6 +47,12 @@ def __init__(
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name)

if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name)
if use_stdin is None:
use_stdin = lsf_version() < "10"
self.use_stdin = use_stdin

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

Expand Down Expand Up @@ -97,7 +104,7 @@ def __init__(
logger.debug("Job script: \n %s" % self.job_script())

async def _submit_job(self, script_filename):
if use_stdin():
if self.use_stdin:
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)
else:
Expand Down Expand Up @@ -189,12 +196,29 @@ class LSFCluster(JobQueueCluster):
lsf_units : str
Unit system for large units in resource usage set by the
LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
use_stdin : bool
LSF's ``bsub`` command allows us to launch a job by passing it as an
argument (``bsub /tmp/jobscript.sh``) or feeding it to stdin
(``bsub < /tmp/jobscript.sh``). Depending on your cluster's configuration
and/or shared filesystem setup, one of those methods may not work,
forcing you to use the other one. This option controls which method
``dask-jobqueue`` will use to submit jobs via ``bsub``.

In particular, if your cluster fails to launch and the LSF log contains
an error message similar to the following:

.. code-block::

/home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory

...then try passing ``use_stdin=True`` here or setting ``use-stdin: true``
in your ``jobqueue.lsf`` config section.

Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFCluster(queue='general', project='DaskonLSF',
... cores=15, memory='25GB')
... cores=15, memory='25GB', use_stdin=True)
>>> cluster.scale(jobs=10) # ask for 10 jobs

>>> from dask.distributed import Client
Expand All @@ -211,13 +235,6 @@ class LSFCluster(JobQueueCluster):
config_name = "lsf"


def use_stdin():
if dask.config.get("jobqueue.lsf.use-stdin") is not None:
return dask.config.get("jobqueue.lsf.use-stdin")

return lsf_version() < "10"


@toolz.memoize
def lsf_version():
out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate()
Expand Down
39 changes: 32 additions & 7 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,41 @@ def test_adaptive_grouped(loop):

def test_config(loop):
with dask.config.set(
{"jobqueue.lsf.walltime": "00:02", "jobqueue.lsf.local-directory": "/foo"}
{
"jobqueue.lsf.walltime": "00:02",
"jobqueue.lsf.local-directory": "/foo",
"jobqueue.lsf.use-stdin": True,
}
):
with LSFCluster(loop=loop, cores=1, memory="2GB") as cluster:
assert "00:02" in cluster.job_script()
assert "--local-directory /foo" in cluster.job_script()
assert cluster._dummy_job.use_stdin


@pytest.mark.parametrize(
"config_value,constructor_value",
[
(None, False),
(None, True),
(True, None),
(False, None),
(True, False), # Constuctor overrides config
],
)
def test_use_stdin(loop, config_value, constructor_value):
"""
Verify that use-stdin is respected when passed via the
config OR the LSFCluster() constructor
"""
with dask.config.set({"jobqueue.lsf.use-stdin": config_value}):
with LSFCluster(
loop=loop, cores=1, memory="2GB", use_stdin=constructor_value
) as cluster:
if constructor_value is not None:
assert cluster._dummy_job.use_stdin == constructor_value
else:
assert cluster._dummy_job.use_stdin == config_value


def test_config_name_lsf_takes_custom_config():
Expand All @@ -244,6 +274,7 @@ def test_config_name_lsf_takes_custom_config():
"env-extra": [],
"log-directory": None,
"shebang": "#!/usr/bin/env bash",
"use-stdin": None,
}

with dask.config.set({"jobqueue.lsf-config-name": conf}):
Expand Down Expand Up @@ -306,9 +337,3 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit):

def test_lsf_unit_detection_without_file():
lsf_unit_detection_helper("kb", conf_text=None)


@pytest.mark.parametrize("stdin", [True, False])
def test_stdin(stdin):
with dask.config.set({"jobqueue.lsf.use-stdin": stdin}):
assert lsf.use_stdin() is stdin