diff --git a/ci/environment.yml b/ci/environment.yml index 8734d9c5..df4cacb2 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -10,3 +10,4 @@ dependencies: - black - pytest - pytest-asyncio + - cryptography diff --git a/ci/pbs.sh b/ci/pbs.sh index c8dbb249..0e2dd7c8 100644 --- a/ci/pbs.sh +++ b/ci/pbs.sh @@ -16,7 +16,7 @@ function jobqueue_before_install { } function jobqueue_install { - docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e ." + docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e .; chown -R pbsuser ." } function jobqueue_script { diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index a12ecdaa..de1338d6 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -8,6 +8,9 @@ import sys import weakref import abc +import tempfile +import copy +import warnings import dask @@ -83,6 +86,9 @@ scheduler_cls : type Changes the class of the used Dask Scheduler. Defaults to Dask's :class:`distributed.Scheduler`. + shared_temp_directory : str + Shared directory between scheduler and worker (used for example by temporary + security certificates) defaults to current working directory if not set. """.strip() @@ -439,6 +445,7 @@ def __init__( # Cluster keywords loop=None, security=None, + shared_temp_directory=None, silence_logs="error", name=None, asynchronous=False, @@ -520,10 +527,17 @@ def __init__( "options": scheduler_options, } + if shared_temp_directory is None: + shared_temp_directory = dask.config.get( + "jobqueue.%s.shared-temp-directory" % config_name + ) + self.shared_temp_directory = shared_temp_directory + job_kwargs["config_name"] = config_name job_kwargs["interface"] = interface job_kwargs["protocol"] = protocol - job_kwargs["security"] = security + job_kwargs["security"] = self._get_worker_security(security) + self._job_kwargs = job_kwargs worker = {"cls": self.job_cls, "options": self._job_kwargs} @@ -607,6 +621,76 @@ def _new_worker_name(self, worker_number): cluster_name=self._name, worker_number=worker_number ) + def _get_worker_security(self, security): + """Dump temporary parts of the security object into a shared_temp_directory""" + if security is None: + return None + + worker_security_dict = security.get_tls_config_for_role("worker") + + # dumping of certificates only needed if multiline in-memory keys are contained + if not any( + [ + (value is not None and "\n" in value) + for value in worker_security_dict.values() + ] + ): + return security + # a shared temp directory should be configured correctly + elif self.shared_temp_directory is None: + shared_temp_directory = os.getcwd() + warnings.warn( + "Using a temporary security object without explicitly setting a shared_temp_directory: \ +writing temp files to current working directory ({}) instead. You can set this value by \ +using dask for e.g. `dask.config.set({{'jobqueue.pbs.shared_temp_directory': '~'}})`\ +or by setting this value in the config file found in `~/.config/dask/jobqueue.yaml` ".format( + shared_temp_directory + ), + category=UserWarning, + ) + else: + shared_temp_directory = os.path.expanduser( + os.path.expandvars(self.shared_temp_directory) + ) + + security = copy.copy(security) + + for key, value in worker_security_dict.items(): + # dump worker in-memory keys for use in job_script + if value is not None and "\n" in value: + + try: + f = tempfile.NamedTemporaryFile( + mode="wt", + prefix=".dask-jobqueue.worker." + key + ".", + dir=shared_temp_directory, + ) + except OSError as e: + raise OSError( + 'failed to dump security objects into shared_temp_directory({})"'.format( + shared_temp_directory + ) + ) from e + + # make sure that the file is bound to life time of self by keeping a reference to the file handle + setattr(self, "_job_" + key, f) + f.write(value) + f.flush() + # allow expanding of vars and user paths in remote script + if self.shared_temp_directory is not None: + fname = os.path.join( + self.shared_temp_directory, os.path.basename(f.name) + ) + else: + fname = f.name + setattr( + security, + "tls_" + ("worker_" if key != "ca_file" else "") + key, + fname, + ) + + return security + def scale(self, n=None, jobs=0, memory=None, cores=None): """Scale cluster to specified configurations. diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index d4a76546..85da8044 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -10,6 +10,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # OAR resource manager options @@ -36,6 +37,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # PBS resource manager options @@ -62,6 +64,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # SGE resource manager options @@ -88,6 +91,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # SLURM resource manager options @@ -115,6 +119,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # PBS resource manager options @@ -141,6 +146,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # LSF resource manager options @@ -170,6 +176,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # HTCondor Resource Manager options @@ -194,6 +201,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] env-extra: [] diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 81b8fbdf..99a55733 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -130,6 +130,7 @@ def test_config_name_htcondor_takes_custom_config(): "log-directory": None, "shebang": "#!/usr/bin/env condor_submit", "local-directory": "/tmp", + "shared-temp-directory": None, } with dask.config.set({"jobqueue.htcondor-config-name": conf}): diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 175d9438..7a05e4cd 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -413,3 +413,47 @@ def test_security(EnvSpecificCluster, loop): security=security, ) as cluster: assert "tls://" in job_script + + +@pytest.mark.xfail_env({"htcondor": "#535 no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"}) +def test_security_temporary(EnvSpecificCluster, loop): + dirname = os.path.dirname(__file__) + with EnvSpecificCluster( + cores=1, + memory="100MB", + security=Security.temporary(), + shared_temp_directory=dirname, + protocol="tls", + loop=loop, + ) as cluster: + assert cluster.security + assert cluster.scheduler_spec["options"]["security"] == cluster.security + job_script = cluster.job_script() + assert "tls://" in job_script + keyfile = re.findall(r"--tls-key (\S+)", job_script)[0] + assert ( + os.path.exists(keyfile) + and os.path.basename(keyfile).startswith(".dask-jobqueue.worker.key") + and os.path.dirname(keyfile) == dirname + ) + certfile = re.findall(r"--tls-cert (\S+)", job_script)[0] + assert ( + os.path.exists(certfile) + and os.path.basename(certfile).startswith(".dask-jobqueue.worker.cert") + and os.path.dirname(certfile) == dirname + ) + cafile = re.findall(r"--tls-ca-file (\S+)", job_script)[0] + assert ( + os.path.exists(cafile) + and os.path.basename(cafile).startswith(".dask-jobqueue.worker.ca_file") + and os.path.dirname(cafile) == dirname + ) + + cluster.scale(jobs=1) + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + result = future.result(timeout=30) + assert result == 11 + + # TODO assert not any([os.path.exists(f) for f in [keyfile, certfile, cafile]]) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 411ecf26..6e0489dc 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -315,6 +315,7 @@ def test_config_name_lsf_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 1ee5b2fc..8e4c409d 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -103,6 +103,7 @@ def test_config_name_oar_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 07b20c3b..887b69f3 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -354,6 +354,7 @@ def test_config_name_pbs_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 037e38a0..f743f90b 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -55,6 +55,7 @@ def test_config_name_sge_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index d91a1662..3218cacf 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -186,6 +186,7 @@ def test_config_name_slurm_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/setup.py b/setup.py index e5966511..0d8b62f6 100755 --- a/setup.py +++ b/setup.py @@ -8,6 +8,14 @@ with open("requirements.txt") as f: install_requires = f.read().strip().split("\n") +extras_require = {} + +extras_require["test"] = [ + "pytest", + "pytest-asyncio", + "cryptography", +] + if exists("README.rst"): with open("README.rst") as f: long_description = f.read() @@ -26,6 +34,7 @@ include_package_data=True, install_requires=install_requires, tests_require=["pytest >= 2.7.1"], + extras_require=extras_require, long_description=long_description, zip_safe=False, )