Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dependencies:
- black
- pytest
- pytest-asyncio
- cryptography
2 changes: 1 addition & 1 deletion ci/pbs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function jobqueue_before_install {
}

function jobqueue_install {
docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e ."
docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e .; chown -R pbsuser ."
Comment thread
guillaumeeb marked this conversation as resolved.
}

function jobqueue_script {
Expand Down
86 changes: 85 additions & 1 deletion dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import sys
import weakref
import abc
import tempfile
import copy
import warnings

import dask

Expand Down Expand Up @@ -83,6 +86,9 @@
scheduler_cls : type
Changes the class of the used Dask Scheduler. Defaults to Dask's
:class:`distributed.Scheduler`.
shared_temp_directory : str
Shared directory between scheduler and worker (used for example by temporary
security certificates) defaults to current working directory if not set.
""".strip()


Expand Down Expand Up @@ -439,6 +445,7 @@ def __init__(
# Cluster keywords
loop=None,
security=None,
shared_temp_directory=None,
silence_logs="error",
name=None,
asynchronous=False,
Expand Down Expand Up @@ -520,10 +527,17 @@ def __init__(
"options": scheduler_options,
}

if shared_temp_directory is None:
shared_temp_directory = dask.config.get(
"jobqueue.%s.shared-temp-directory" % config_name
)
self.shared_temp_directory = shared_temp_directory

job_kwargs["config_name"] = config_name
job_kwargs["interface"] = interface
job_kwargs["protocol"] = protocol
job_kwargs["security"] = security
job_kwargs["security"] = self._get_worker_security(security)
Comment thread
guillaumeeb marked this conversation as resolved.

self._job_kwargs = job_kwargs

worker = {"cls": self.job_cls, "options": self._job_kwargs}
Expand Down Expand Up @@ -607,6 +621,76 @@ def _new_worker_name(self, worker_number):
cluster_name=self._name, worker_number=worker_number
)

def _get_worker_security(self, security):
Comment thread
guillaumeeb marked this conversation as resolved.
"""Dump temporary parts of the security object into a shared_temp_directory"""
if security is None:
return None

worker_security_dict = security.get_tls_config_for_role("worker")

# dumping of certificates only needed if multiline in-memory keys are contained
if not any(
[
(value is not None and "\n" in value)
for value in worker_security_dict.values()
]
):
return security
# a shared temp directory should be configured correctly
elif self.shared_temp_directory is None:
shared_temp_directory = os.getcwd()
warnings.warn(
"Using a temporary security object without explicitly setting a shared_temp_directory: \
writing temp files to current working directory ({}) instead. You can set this value by \
using dask for e.g. `dask.config.set({{'jobqueue.pbs.shared_temp_directory': '~'}})`\
or by setting this value in the config file found in `~/.config/dask/jobqueue.yaml` ".format(
shared_temp_directory
),
category=UserWarning,
)
else:
shared_temp_directory = os.path.expanduser(
os.path.expandvars(self.shared_temp_directory)
)

security = copy.copy(security)

for key, value in worker_security_dict.items():
# dump worker in-memory keys for use in job_script
if value is not None and "\n" in value:

try:
f = tempfile.NamedTemporaryFile(
mode="wt",
prefix=".dask-jobqueue.worker." + key + ".",
dir=shared_temp_directory,
)
except OSError as e:
raise OSError(
'failed to dump security objects into shared_temp_directory({})"'.format(
shared_temp_directory
)
) from e

# make sure that the file is bound to life time of self by keeping a reference to the file handle
setattr(self, "_job_" + key, f)
f.write(value)
f.flush()
# allow expanding of vars and user paths in remote script
if self.shared_temp_directory is not None:
fname = os.path.join(
self.shared_temp_directory, os.path.basename(f.name)
)
else:
fname = f.name
setattr(
security,
"tls_" + ("worker_" if key != "ca_file" else "") + key,
fname,
)

return security

def scale(self, n=None, jobs=0, memory=None, cores=None):
"""Scale cluster to specified configurations.

Expand Down
8 changes: 8 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# OAR resource manager options
Expand All @@ -36,6 +37,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# PBS resource manager options
Expand All @@ -62,6 +64,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# SGE resource manager options
Expand All @@ -88,6 +91,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# SLURM resource manager options
Expand Down Expand Up @@ -115,6 +119,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# PBS resource manager options
Expand All @@ -141,6 +146,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# LSF resource manager options
Expand Down Expand Up @@ -170,6 +176,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

# HTCondor Resource Manager options
Expand All @@ -194,6 +201,7 @@ jobqueue:
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers
extra: []

env-extra: []
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def test_config_name_htcondor_takes_custom_config():
"log-directory": None,
"shebang": "#!/usr/bin/env condor_submit",
"local-directory": "/tmp",
"shared-temp-directory": None,
}

with dask.config.set({"jobqueue.htcondor-config-name": conf}):
Expand Down
44 changes: 44 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,47 @@ def test_security(EnvSpecificCluster, loop):
security=security,
) as cluster:
assert "tls://" in job_script


@pytest.mark.xfail_env({"htcondor": "#535 no shared filesystem in htcondor ci"})
@pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"})
def test_security_temporary(EnvSpecificCluster, loop):
dirname = os.path.dirname(__file__)
with EnvSpecificCluster(
cores=1,
memory="100MB",
security=Security.temporary(),
shared_temp_directory=dirname,
protocol="tls",
loop=loop,
) as cluster:
assert cluster.security
assert cluster.scheduler_spec["options"]["security"] == cluster.security
job_script = cluster.job_script()
assert "tls://" in job_script
keyfile = re.findall(r"--tls-key (\S+)", job_script)[0]
assert (
os.path.exists(keyfile)
and os.path.basename(keyfile).startswith(".dask-jobqueue.worker.key")
and os.path.dirname(keyfile) == dirname
)
certfile = re.findall(r"--tls-cert (\S+)", job_script)[0]
assert (
os.path.exists(certfile)
and os.path.basename(certfile).startswith(".dask-jobqueue.worker.cert")
and os.path.dirname(certfile) == dirname
)
cafile = re.findall(r"--tls-ca-file (\S+)", job_script)[0]
assert (
os.path.exists(cafile)
and os.path.basename(cafile).startswith(".dask-jobqueue.worker.ca_file")
and os.path.dirname(cafile) == dirname
)

cluster.scale(jobs=1)
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
result = future.result(timeout=30)
assert result == 11

# TODO assert not any([os.path.exists(f) for f in [keyfile, certfile, cafile]])
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def test_config_name_lsf_takes_custom_config():
"interface": None,
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"env-extra": [],
"log-directory": None,
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def test_config_name_oar_takes_custom_config():
"interface": None,
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"env-extra": [],
"log-directory": None,
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def test_config_name_pbs_takes_custom_config():
"interface": None,
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"env-extra": [],
"log-directory": None,
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_config_name_sge_takes_custom_config():
"interface": None,
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"env-extra": [],
"log-directory": None,
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def test_config_name_slurm_takes_custom_config():
"interface": None,
"death-timeout": None,
"local-directory": "/foo",
"shared-temp-directory": None,
"extra": [],
"env-extra": [],
"log-directory": None,
Expand Down
9 changes: 9 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
with open("requirements.txt") as f:
install_requires = f.read().strip().split("\n")

extras_require = {}

extras_require["test"] = [
"pytest",
"pytest-asyncio",
"cryptography",
]

if exists("README.rst"):
with open("README.rst") as f:
long_description = f.read()
Expand All @@ -26,6 +34,7 @@
include_package_data=True,
install_requires=install_requires,
tests_require=["pytest >= 2.7.1"],
extras_require=extras_require,
long_description=long_description,
zip_safe=False,
)