Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
death_timeout : float
Seconds to wait for a scheduler before closing workers
extra : list
Deprecated: use ``worker_extra_args`` instead. This parameter will be removed in a future version.
worker_extra_args : list
Additional arguments to pass to `dask-worker`
env_extra : list
Deprecated: use ``job_script_prologue`` instead. This parameter will be removed in a future version.
Expand Down Expand Up @@ -156,6 +158,7 @@ def __init__(
death_timeout=None,
local_directory=None,
extra=None,
worker_extra_args=None,
env_extra=None,
job_script_prologue=None,
header_skip=None,
Expand Down Expand Up @@ -208,6 +211,21 @@ def __init__(
)
if extra is None:
extra = dask.config.get("jobqueue.%s.extra" % self.config_name)
if worker_extra_args is None:
worker_extra_args = dask.config.get(
"jobqueue.%s.worker-extra-args" % self.config_name
)
if extra is not None:
warn = (
"extra has been renamed to worker_extra_args. "
"You are still using it (even if only set to []; please also check config files). "
"If you did not set worker_extra_args yet, extra will be respected for now, "
"but it will be removed in a future release. "
"If you already set worker_extra_args, extra is ignored and you can remove it."
)
warnings.warn(warn, FutureWarning)
if not worker_extra_args:
worker_extra_args = extra
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % self.config_name)
if job_script_prologue is None:
Expand All @@ -217,7 +235,7 @@ def __init__(
if env_extra is not None:
warn = (
"env_extra has been renamed to job_script_prologue. "
"You are still using it (even if only set to []; please also check config files)."
"You are still using it (even if only set to []; please also check config files). "
"If you did not set job_script_prologue yet, env_extra will be respected for now, "
"but it will be removed in a future release. "
"If you already set job_script_prologue, env_extra is ignored and you can remove it."
Expand All @@ -240,9 +258,9 @@ def __init__(
self.job_header = None

if interface:
extra = extra + ["--interface", interface]
worker_extra_args += ["--interface", interface]
if protocol:
extra = extra + ["--protocol", protocol]
worker_extra_args += ["--protocol", protocol]
if security:
worker_security_dict = security.get_tls_config_for_role("worker")
security_command_line_list = [
Expand All @@ -252,7 +270,7 @@ def __init__(
if key != "ciphers"
]
security_command_line = sum(security_command_line_list, [])
extra = extra + security_command_line
worker_extra_args += security_command_line

# Keep information on process, cores, and memory, for use in subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
Expand Down Expand Up @@ -283,8 +301,8 @@ def __init__(
command_args += ["--death-timeout", death_timeout]
if local_directory is not None:
command_args += ["--local-directory", local_directory]
if extra is not None:
command_args += extra
if worker_extra_args is not None:
command_args += worker_extra_args

self._command_template = " ".join(map(str, command_args))

Expand Down
24 changes: 16 additions & 8 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# OAR resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -39,7 +40,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# PBS resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -67,7 +69,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# SGE resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -95,7 +98,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# SLURM resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -124,7 +128,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# PBS resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -152,7 +157,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# LSF resource manager options
shebang: "#!/usr/bin/env bash"
Expand Down Expand Up @@ -183,7 +189,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

# HTCondor Resource Manager options
disk: null # Total amount of disk per job
Expand All @@ -209,7 +216,8 @@ jobqueue:
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []
extra: null # deprecated: use worker-extra-args
worker-extra-args: [] # Additional arguments to pass to `dask-worker`

env-extra: null
job-script-prologue: []
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def test_config_name_htcondor_takes_custom_config():
"processes": 1,
"interface": None,
"death-timeout": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
66 changes: 65 additions & 1 deletion dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def test_docstring_cluster(Cluster):
assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__


def test_deprecation(Cluster):
def test_deprecation_env_extra(Cluster):
import warnings

# test issuing of warning
Expand Down Expand Up @@ -207,3 +207,67 @@ def test_deprecation(Cluster):
)
job_script = job.job_script()
assert "env_extra" in job_script


def test_deprecation_extra(Cluster):
import warnings

# test issuing of warning
warnings.simplefilter("always")

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
# should give a warning
job = job_cls(cores=1, memory="1 GB", extra=["old_param"])
assert len(w) == 1
assert issubclass(w[0].category, FutureWarning)
assert "extra has been renamed" in str(w[0].message)
with warnings.catch_warnings(record=True) as w:
# should give a warning
job = job_cls(
cores=1,
memory="1 GB",
extra=["old_param"],
worker_extra_args=["new_param"],
)
assert len(w) == 1
assert issubclass(w[0].category, FutureWarning)
assert "extra has been renamed" in str(w[0].message)
with warnings.catch_warnings(record=True) as w:
# should not give a warning
job = job_cls(
cores=1,
memory="1 GB",
worker_extra_args=["new_param"],
)
assert len(w) == 0

# the rest is not about the warning but about behaviour: if worker_extra_args is not
# set, extra should still be used if provided
warnings.simplefilter("ignore")
job = job_cls(
cores=1,
memory="1 GB",
extra=["old_param"],
worker_extra_args=["new_param"],
)
job_script = job.job_script()
assert "old_param" not in job_script
assert "new_param" in job_script

job = job_cls(
cores=1,
memory="1 GB",
extra=["old_param"],
)
job_script = job.job_script()
assert "old_param" in job_script

job = job_cls(
cores=1,
memory="1 GB",
extra=["old_param"],
worker_extra_args=[],
)
job_script = job.job_script()
assert "old_param" in job_script
2 changes: 1 addition & 1 deletion dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_command_template(Cluster):
memory="4GB",
death_timeout=60,
local_directory="/scratch",
extra=["--preload", "mymodule"],
worker_extra_args=["--preload", "mymodule"],
) as cluster:
assert " --death-timeout 60" in cluster._dummy_job._command_template
assert " --local-directory /scratch" in cluster._dummy_job._command_template
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ def test_config_name_lsf_takes_custom_config():
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ def test_config_name_oar_takes_custom_config():
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def test_config_name_pbs_takes_custom_config():
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def test_config_name_sge_takes_custom_config():
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
3 changes: 2 additions & 1 deletion dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ def test_config_name_slurm_takes_custom_config():
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"extra": None,
"worker-extra-args": [],
"env-extra": None,
"job-script-prologue": [],
"log-directory": None,
Expand Down
9 changes: 8 additions & 1 deletion docs/source/advanced-tips-and-tricks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,16 @@ Here is an example of how to use these parameters:

.. code-block:: python

cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
cluster = Cluster(
walltime="01:00:00",
cores=4,
memory="16gb",
worker_extra_args=["--lifetime", "55m", "--lifetime-stagger", "4m"],
)
cluster.adapt(minimum=0, maximum=200)

*Note: the parameter* ``worker_extra_args`` *was named* ``extra`` *until version 0.7.4.* ``extra`` *can still
be used, but is considered deprecated and will be removed in a future version.*

Here is an example of a workflow taking advantage of this, if you want to give it a try or adapt it to your use case:

Expand Down
15 changes: 9 additions & 6 deletions docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,22 @@ argument is for the specification of abstract resources, described `here
<http://distributed.dask.org/en/latest/resources.html>`__. This could be used
to specify special hardware availability that the scheduler is not aware of,
for example GPUs. Below, the arbitrary resources "ssdGB" and "GPU" are
specified. Notice that the ``extra`` keyword is used to pass through arguments
to the dask-workers.
specified. Notice that the ``worker_extra_args`` keyword is used to pass through
arguments to the dask-workers.

*Note: the parameter* ``worker_extra_args`` *was named* ``extra`` *until version
0.7.4.* ``extra`` *can still be used, but is considered deprecated and will be
removed in a future version.*

.. code-block:: python

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
processes=1,
cores=2,
extra=['--resources ssdGB=200,GPU=2'])
cluster = SLURMCluster(
memory="8g", processes=1, cores=2, worker_extra_args=["--resources ssdGB=200,GPU=2"]
)

cluster.scale(2)
client = Client(cluster)
Expand Down