From a8b35f54580ae2393220a42a1d56bc59fb42ba8a Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 12 Sep 2021 11:35:59 +0200 Subject: [PATCH 01/18] dump temporary security contexts to file --- dask_jobqueue/core.py | 24 +++++++++++++- dask_jobqueue/tests/test_jobqueue_core.py | 40 +++++++++++++++++++++++ requirements.txt | 1 + 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index a12ecdaa..fb92b33a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -8,6 +8,8 @@ import sys import weakref import abc +import tempfile +import copy import dask @@ -219,6 +221,7 @@ def __init__( extra = extra + ["--protocol", protocol] if security: worker_security_dict = security.get_tls_config_for_role("worker") + security_command_line_list = [ ["--tls-" + key.replace("_", "-"), value] for key, value in worker_security_dict.items() @@ -523,7 +526,26 @@ def __init__( job_kwargs["config_name"] = config_name job_kwargs["interface"] = interface job_kwargs["protocol"] = protocol - job_kwargs["security"] = security + job_kwargs["security"] = copy.copy(security) + + if security is not None: + worker_security_dict = job_kwargs["security"].get_tls_config_for_role( + "worker" + ) + for key, value in worker_security_dict.items(): + # dump worker in-memory keys for use in job_script + if value is not None and "\n" in value: + f = tempfile.NamedTemporaryFile(mode="wt") + # make sure that tmpfile survives by keeping a reference + setattr(self, "_job_" + key, f) + f.write(value) + f.flush() + setattr( + job_kwargs["security"], + "tls_" + ("worker_" if key != "ca_file" else "") + key, + f.name, + ) + self._job_kwargs = job_kwargs worker = {"cls": self.job_cls, "options": self._job_kwargs} diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 175d9438..27f1e8af 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -413,3 +413,43 @@ def test_security(EnvSpecificCluster, loop): security=security, ) as cluster: assert "tls://" in job_script + + +@pytest.mark.xfail_env({"htcondor": "#535 no shared filesystem in htcondor ci"}) +@pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"}) +def test_security_temporary(EnvSpecificCluster, loop): + dirname = os.path.dirname(__file__) + with open(os.path.join(dirname, "key.pem"), "r") as file: + key = file.read() + with open(os.path.join(dirname, "ca.pem"), "r") as file: + cert = file.read() + security = Security( + tls_ca_file=cert, + tls_scheduler_key=key, + tls_scheduler_cert=cert, + tls_worker_key=key, + tls_worker_cert=cert, + tls_client_key=key, + tls_client_cert=cert, + require_encryption=True, + ) + with EnvSpecificCluster( + cores=1, + memory="100MB", + security=security, + protocol="tls", + loop=loop, + ) as cluster: + assert cluster.security + assert cluster.scheduler_spec["options"]["security"] == cluster.security + job_script = cluster.job_script() + assert "tls://" in job_script + assert "--tls-key" in job_script + assert "--tls-cert" in job_script + assert "--tls-ca-file" in job_script + + cluster.scale(jobs=1) + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + result = future.result(timeout=30) + assert result == 11 diff --git a/requirements.txt b/requirements.txt index 3c32ecaf..438617a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ dask>=2.23 distributed>=2.23 +cryptography[security] From c7174bb9cbb289d5ee708097170adc9f2cbc64bf Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sat, 27 Nov 2021 23:10:53 +0100 Subject: [PATCH 02/18] move to temp file to predefined shared directory --- dask_jobqueue/core.py | 18 +++++++++++++++++- dask_jobqueue/jobqueue.yaml | 8 ++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index fb92b33a..0fb96029 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -85,6 +85,8 @@ scheduler_cls : type Changes the class of the used Dask Scheduler. Defaults to Dask's :class:`distributed.Scheduler`. + shared_directory : str + Shared directory between scheduler and worker defaults to "~" """.strip() @@ -442,6 +444,7 @@ def __init__( # Cluster keywords loop=None, security=None, + shared_directory=None, silence_logs="error", name=None, asynchronous=False, @@ -528,6 +531,11 @@ def __init__( job_kwargs["protocol"] = protocol job_kwargs["security"] = copy.copy(security) + if shared_directory is None: + shared_directory = dask.config.get( + "jobqueue.%s.shared-directory" % config_name + ) + if security is not None: worker_security_dict = job_kwargs["security"].get_tls_config_for_role( "worker" @@ -535,7 +543,15 @@ def __init__( for key, value in worker_security_dict.items(): # dump worker in-memory keys for use in job_script if value is not None and "\n" in value: - f = tempfile.NamedTemporaryFile(mode="wt") + f = tempfile.NamedTemporaryFile( + mode="wt", + prefix=".dask-jobqueue" + key, + dir=( + os.path.expanduser(os.path.expandvars(shared_directory)) + if shared_directory is not None + else None + ), + ) # make sure that tmpfile survives by keeping a reference setattr(self, "_job_" + key, f) f.write(value) diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index d4a76546..bb12af9e 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -10,6 +10,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # OAR resource manager options @@ -36,6 +37,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # PBS resource manager options @@ -62,6 +64,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # SGE resource manager options @@ -88,6 +91,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # SLURM resource manager options @@ -115,6 +119,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # PBS resource manager options @@ -141,6 +146,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # LSF resource manager options @@ -170,6 +176,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: "~" # Shared directory between scheduler and worker extra: [] # HTCondor Resource Manager options @@ -194,6 +201,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-directory: null # Shared directory between scheduler and worker extra: [] env-extra: [] From 2cf5d8b71191fea0808b2db916c9f7b3bc8034c2 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 10:38:13 +0100 Subject: [PATCH 03/18] add shared_directory to tests and fail on using security if not set --- dask_jobqueue/core.py | 12 +++++++++++- dask_jobqueue/jobqueue.yaml | 2 +- dask_jobqueue/tests/test_htcondor.py | 1 + dask_jobqueue/tests/test_lsf.py | 1 + dask_jobqueue/tests/test_oar.py | 1 + dask_jobqueue/tests/test_pbs.py | 1 + dask_jobqueue/tests/test_sge.py | 1 + dask_jobqueue/tests/test_slurm.py | 1 + 8 files changed, 18 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 0fb96029..e524e524 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -537,6 +537,11 @@ def __init__( ) if security is not None: + if shared_directory is None: + raise ValueError( + "Passing a security object to workers is only supported if a shared directory is configured" + ) + worker_security_dict = job_kwargs["security"].get_tls_config_for_role( "worker" ) @@ -556,10 +561,15 @@ def __init__( setattr(self, "_job_" + key, f) f.write(value) f.flush() + # allow expanding of vars and user paths in remote script + if shared_directory is not None: + fname = os.path.join(shared_directory, os.path.basename(f.name)) + else: + fname = f.name setattr( job_kwargs["security"], "tls_" + ("worker_" if key != "ca_file" else "") + key, - f.name, + fname, ) self._job_kwargs = job_kwargs diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index bb12af9e..bf0e0b13 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -201,7 +201,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: null # Shared directory between scheduler and worker + shared-directory: "/tmp" # Shared directory between scheduler and worker extra: [] env-extra: [] diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 81b8fbdf..dbde4e28 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -130,6 +130,7 @@ def test_config_name_htcondor_takes_custom_config(): "log-directory": None, "shebang": "#!/usr/bin/env condor_submit", "local-directory": "/tmp", + "shared-directory": None, } with dask.config.set({"jobqueue.htcondor-config-name": conf}): diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 411ecf26..75035305 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -315,6 +315,7 @@ def test_config_name_lsf_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 1ee5b2fc..69012fe5 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -103,6 +103,7 @@ def test_config_name_oar_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 07b20c3b..aeda3257 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -354,6 +354,7 @@ def test_config_name_pbs_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 037e38a0..2e7abb94 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -55,6 +55,7 @@ def test_config_name_sge_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index d91a1662..0218361c 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -186,6 +186,7 @@ def test_config_name_slurm_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", + "shared-directory": None, "extra": [], "env-extra": [], "log-directory": None, From f2cfcae791a0b7d87f2a64431c4fbf941e41948c Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 10:48:56 +0100 Subject: [PATCH 04/18] explicitly set shared directory for test --- dask_jobqueue/tests/test_jobqueue_core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 27f1e8af..41e6b13f 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -437,6 +437,7 @@ def test_security_temporary(EnvSpecificCluster, loop): cores=1, memory="100MB", security=security, + shared_directory=dirname, protocol="tls", loop=loop, ) as cluster: From d7694c0366af511f2b00bf9fb5698ba64404681c Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 11:04:58 +0100 Subject: [PATCH 05/18] set the prefix of the temporary to be more readable --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index e524e524..5dd7c660 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -550,7 +550,7 @@ def __init__( if value is not None and "\n" in value: f = tempfile.NamedTemporaryFile( mode="wt", - prefix=".dask-jobqueue" + key, + prefix=".dask-jobqueue.worker." + key + ".", dir=( os.path.expanduser(os.path.expandvars(shared_directory)) if shared_directory is not None From d0e120afcd9dd20cc4126c23f831408927cb289a Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 11:58:16 +0100 Subject: [PATCH 06/18] clean up temporary file in context handler and test for cleanup --- dask_jobqueue/core.py | 13 +++++++++++ dask_jobqueue/tests/test_jobqueue_core.py | 28 ++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5dd7c660..a242c618 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -595,6 +595,19 @@ def __init__( if n_workers: self.scale(n_workers) + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # delete any temporary security + if "security" in self._job_kwargs and self._job_kwargs["security"] is not None: + worker_security_dict = self._job_kwargs["security"].get_tls_config_for_role( + "worker" + ) + for key, value in worker_security_dict.items(): + if hasattr(self, "_job_" + key): + delattr(self, "_job_" + key) + @property def _dummy_job(self): """ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 41e6b13f..3f70f376 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -433,6 +433,11 @@ def test_security_temporary(EnvSpecificCluster, loop): tls_client_cert=cert, require_encryption=True, ) + + for root, dirs, files in os.walk(dirname): + for file in files: + assert not file.startswith(".dask-jobqueue.worker.") + with EnvSpecificCluster( cores=1, memory="100MB", @@ -445,12 +450,29 @@ def test_security_temporary(EnvSpecificCluster, loop): assert cluster.scheduler_spec["options"]["security"] == cluster.security job_script = cluster.job_script() assert "tls://" in job_script - assert "--tls-key" in job_script - assert "--tls-cert" in job_script - assert "--tls-ca-file" in job_script + assert ( + "--tls-key {}".format(os.path.join(dirname, ".dask-jobqueue.worker.key.")) + in job_script + ) + assert ( + "--tls-cert {}".format(os.path.join(dirname, ".dask-jobqueue.worker.cert.")) + in job_script + ) + assert ( + "--tls-ca-file {}".format( + os.path.join(dirname, ".dask-jobqueue.worker.ca_file.") + ) + in job_script + ) cluster.scale(jobs=1) with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=30) assert result == 11 + cluster.close() + del cluster + + for root, dirs, files in os.walk(dirname): + for file in files: + assert root and file and not file.startswith(".dask-jobqueue.worker.") From 86977afe8336b160e479836fb65db22704aed502 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 15:04:30 +0100 Subject: [PATCH 07/18] closing cluster in context handler and fix parallel testing and pbs ci user * added context handler to Job Cluster in core.py * make source dir owned by pbsuser * dynamically select port to fix sporadic port already in use errors --- ci/pbs.sh | 2 +- dask_jobqueue/core.py | 1 + dask_jobqueue/tests/test_jobqueue_core.py | 54 ++++++++++++----------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/ci/pbs.sh b/ci/pbs.sh index c8dbb249..0e2dd7c8 100644 --- a/ci/pbs.sh +++ b/ci/pbs.sh @@ -16,7 +16,7 @@ function jobqueue_before_install { } function jobqueue_install { - docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e ." + docker exec pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e .; chown -R pbsuser ." } function jobqueue_script { diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index a242c618..5e0f4203 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -607,6 +607,7 @@ def __exit__(self, type, value, traceback): for key, value in worker_security_dict.items(): if hasattr(self, "_job_" + key): delattr(self, "_job_" + key) + self.close() @property def _dummy_job(self): diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 3f70f376..52a499e8 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -257,10 +257,20 @@ def test_default_number_of_worker_processes(Cluster): assert " --nthreads 2" in cluster.job_script() -def test_scheduler_options(Cluster): +def get_interface_and_port(index=0): net_if_addrs = psutil.net_if_addrs() - interface = list(net_if_addrs.keys())[0] - port = 8804 + interface = list(net_if_addrs.keys())[index] + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((net_if_addrs[interface][0].address, 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + port = s.getsockname()[1] + s.close() + return (interface, port) + + +def test_scheduler_options(Cluster): + + interface, port = get_interface_and_port() with Cluster( cores=1, @@ -323,13 +333,9 @@ def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster) def test_import_scheduler_options_from_config(Cluster): + config_scheduler_interface, config_scheduler_port = get_interface_and_port() - net_if_addrs = psutil.net_if_addrs() - - config_scheduler_interface = list(net_if_addrs.keys())[0] - config_scheduler_port = 8804 - - pass_scheduler_interface = list(net_if_addrs.keys())[1] + pass_scheduler_interface, _ = get_interface_and_port() scheduler_options = { "interface": config_scheduler_interface, @@ -434,10 +440,6 @@ def test_security_temporary(EnvSpecificCluster, loop): require_encryption=True, ) - for root, dirs, files in os.walk(dirname): - for file in files: - assert not file.startswith(".dask-jobqueue.worker.") - with EnvSpecificCluster( cores=1, memory="100MB", @@ -450,19 +452,23 @@ def test_security_temporary(EnvSpecificCluster, loop): assert cluster.scheduler_spec["options"]["security"] == cluster.security job_script = cluster.job_script() assert "tls://" in job_script + keyfile = re.findall(r"--tls-key (\S+)", job_script)[0] assert ( - "--tls-key {}".format(os.path.join(dirname, ".dask-jobqueue.worker.key.")) - in job_script + os.path.exists(keyfile) + and os.path.basename(keyfile).startswith(".dask-jobqueue.worker.key") + and os.path.dirname(keyfile) == dirname ) + certfile = re.findall(r"--tls-cert (\S+)", job_script)[0] assert ( - "--tls-cert {}".format(os.path.join(dirname, ".dask-jobqueue.worker.cert.")) - in job_script + os.path.exists(certfile) + and os.path.basename(certfile).startswith(".dask-jobqueue.worker.cert") + and os.path.dirname(certfile) == dirname ) + cafile = re.findall(r"--tls-ca-file (\S+)", job_script)[0] assert ( - "--tls-ca-file {}".format( - os.path.join(dirname, ".dask-jobqueue.worker.ca_file.") - ) - in job_script + os.path.exists(cafile) + and os.path.basename(cafile).startswith(".dask-jobqueue.worker.ca_file") + and os.path.dirname(cafile) == dirname ) cluster.scale(jobs=1) @@ -470,9 +476,5 @@ def test_security_temporary(EnvSpecificCluster, loop): future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=30) assert result == 11 - cluster.close() - del cluster - for root, dirs, files in os.walk(dirname): - for file in files: - assert root and file and not file.startswith(".dask-jobqueue.worker.") + assert not any([os.path.exists(f) for f in [keyfile, certfile, cafile]]) From 36a4686e76896676982fd3ab6eb2d9db4901bff5 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 19:53:26 +0100 Subject: [PATCH 08/18] revert cleanup using context manager --- dask_jobqueue/core.py | 14 -------------- dask_jobqueue/tests/test_jobqueue_core.py | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5e0f4203..5dd7c660 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -595,20 +595,6 @@ def __init__( if n_workers: self.scale(n_workers) - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - # delete any temporary security - if "security" in self._job_kwargs and self._job_kwargs["security"] is not None: - worker_security_dict = self._job_kwargs["security"].get_tls_config_for_role( - "worker" - ) - for key, value in worker_security_dict.items(): - if hasattr(self, "_job_" + key): - delattr(self, "_job_" + key) - self.close() - @property def _dummy_job(self): """ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 52a499e8..df3feb9d 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -477,4 +477,4 @@ def test_security_temporary(EnvSpecificCluster, loop): result = future.result(timeout=30) assert result == 11 - assert not any([os.path.exists(f) for f in [keyfile, certfile, cafile]]) + # TODO assert not any([os.path.exists(f) for f in [keyfile, certfile, cafile]]) From 659d1c895d2447b90789e7a4a4bc4959e6523d6d Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sun, 28 Nov 2021 20:13:15 +0100 Subject: [PATCH 09/18] revert unrelated changes --- dask_jobqueue/core.py | 1 - dask_jobqueue/tests/test_jobqueue_core.py | 24 +++++++++-------------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5dd7c660..047e82db 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -223,7 +223,6 @@ def __init__( extra = extra + ["--protocol", protocol] if security: worker_security_dict = security.get_tls_config_for_role("worker") - security_command_line_list = [ ["--tls-" + key.replace("_", "-"), value] for key, value in worker_security_dict.items() diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index df3feb9d..b5b8710f 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -257,20 +257,10 @@ def test_default_number_of_worker_processes(Cluster): assert " --nthreads 2" in cluster.job_script() -def get_interface_and_port(index=0): - net_if_addrs = psutil.net_if_addrs() - interface = list(net_if_addrs.keys())[index] - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind((net_if_addrs[interface][0].address, 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - port = s.getsockname()[1] - s.close() - return (interface, port) - - def test_scheduler_options(Cluster): - - interface, port = get_interface_and_port() + net_if_addrs = psutil.net_if_addrs() + interface = list(net_if_addrs.keys())[0] + port = 8804 with Cluster( cores=1, @@ -333,9 +323,13 @@ def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster) def test_import_scheduler_options_from_config(Cluster): - config_scheduler_interface, config_scheduler_port = get_interface_and_port() - pass_scheduler_interface, _ = get_interface_and_port() + net_if_addrs = psutil.net_if_addrs() + + config_scheduler_interface = list(net_if_addrs.keys())[0] + config_scheduler_port = 8804 + + pass_scheduler_interface = list(net_if_addrs.keys())[1] scheduler_options = { "interface": config_scheduler_interface, From bf0c957416b403cc4b6de65b633cb3f91f5e5060 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Mon, 29 Nov 2021 12:53:00 +0100 Subject: [PATCH 10/18] refactor into function --- dask_jobqueue/core.py | 79 ++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 047e82db..0cf69635 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -525,15 +525,11 @@ def __init__( "options": scheduler_options, } - job_kwargs["config_name"] = config_name - job_kwargs["interface"] = interface - job_kwargs["protocol"] = protocol - job_kwargs["security"] = copy.copy(security) - if shared_directory is None: shared_directory = dask.config.get( "jobqueue.%s.shared-directory" % config_name ) + self.shared_directory = shared_directory if security is not None: if shared_directory is None: @@ -541,35 +537,10 @@ def __init__( "Passing a security object to workers is only supported if a shared directory is configured" ) - worker_security_dict = job_kwargs["security"].get_tls_config_for_role( - "worker" - ) - for key, value in worker_security_dict.items(): - # dump worker in-memory keys for use in job_script - if value is not None and "\n" in value: - f = tempfile.NamedTemporaryFile( - mode="wt", - prefix=".dask-jobqueue.worker." + key + ".", - dir=( - os.path.expanduser(os.path.expandvars(shared_directory)) - if shared_directory is not None - else None - ), - ) - # make sure that tmpfile survives by keeping a reference - setattr(self, "_job_" + key, f) - f.write(value) - f.flush() - # allow expanding of vars and user paths in remote script - if shared_directory is not None: - fname = os.path.join(shared_directory, os.path.basename(f.name)) - else: - fname = f.name - setattr( - job_kwargs["security"], - "tls_" + ("worker_" if key != "ca_file" else "") + key, - fname, - ) + job_kwargs["config_name"] = config_name + job_kwargs["interface"] = interface + job_kwargs["protocol"] = protocol + job_kwargs["security"] = self._get_worker_security(security) self._job_kwargs = job_kwargs @@ -654,6 +625,46 @@ def _new_worker_name(self, worker_number): cluster_name=self._name, worker_number=worker_number ) + def _get_worker_security(self, security): + """Dump temporary parts of the security object into a shared_directory""" + if security is not None: + copied_on_write = False + worker_security_dict = security.get_tls_config_for_role("worker") + for key, value in worker_security_dict.items(): + # dump worker in-memory keys for use in job_script + if value is not None and "\n" in value: + if not copied_on_write: + security = copy.copy(security) + copied_on_write = True + f = tempfile.NamedTemporaryFile( + mode="wt", + prefix=".dask-jobqueue.worker." + key + ".", + dir=( + os.path.expanduser( + os.path.expandvars(self.shared_directory) + ) + if self.shared_directory is not None + else None + ), + ) + # make sure that tmpfile survives by keeping a reference + setattr(self, "_job_" + key, f) + f.write(value) + f.flush() + # allow expanding of vars and user paths in remote script + if self.shared_directory is not None: + fname = os.path.join( + self.shared_directory, os.path.basename(f.name) + ) + else: + fname = f.name + setattr( + security, + "tls_" + ("worker_" if key != "ca_file" else "") + key, + fname, + ) + return security + def scale(self, n=None, jobs=0, memory=None, cores=None): """Scale cluster to specified configurations. From fbdbcf9c108b64516f78cd4cd46d154962d9cdcb Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Mon, 29 Nov 2021 13:34:56 +0100 Subject: [PATCH 11/18] remove unused dependency --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 438617a4..3c32ecaf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ dask>=2.23 distributed>=2.23 -cryptography[security] From 547afd2a3fef835b611a012c22f895dd78fae591 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Fri, 3 Dec 2021 23:18:27 +0100 Subject: [PATCH 12/18] rename setting to shared_temp_directory and rewrote defaults and exception handling --- ci/environment.yml | 1 + ci/htcondor.sh | 2 +- dask_jobqueue/core.py | 107 +++++++++++++--------- dask_jobqueue/jobqueue.yaml | 16 ++-- dask_jobqueue/tests/test_htcondor.py | 2 +- dask_jobqueue/tests/test_jobqueue_core.py | 19 +--- dask_jobqueue/tests/test_lsf.py | 2 +- dask_jobqueue/tests/test_oar.py | 2 +- dask_jobqueue/tests/test_pbs.py | 2 +- dask_jobqueue/tests/test_sge.py | 2 +- dask_jobqueue/tests/test_slurm.py | 2 +- setup.py | 8 ++ 12 files changed, 90 insertions(+), 75 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index 8734d9c5..df4cacb2 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -10,3 +10,4 @@ dependencies: - black - pytest - pytest-asyncio + - cryptography diff --git a/ci/htcondor.sh b/ci/htcondor.sh index 142b5b34..3f9dce62 100755 --- a/ci/htcondor.sh +++ b/ci/htcondor.sh @@ -16,7 +16,7 @@ function jobqueue_before_install { function jobqueue_install { cd ./ci/htcondor - docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .;chown -R submituser ." + docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .[test];chown -R submituser ." cd - } diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 0cf69635..d2fafc6a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -10,6 +10,7 @@ import abc import tempfile import copy +import warnings import dask @@ -85,7 +86,7 @@ scheduler_cls : type Changes the class of the used Dask Scheduler. Defaults to Dask's :class:`distributed.Scheduler`. - shared_directory : str + shared_temp_directory : str Shared directory between scheduler and worker defaults to "~" """.strip() @@ -443,7 +444,7 @@ def __init__( # Cluster keywords loop=None, security=None, - shared_directory=None, + shared_temp_directory=None, silence_logs="error", name=None, asynchronous=False, @@ -525,17 +526,11 @@ def __init__( "options": scheduler_options, } - if shared_directory is None: - shared_directory = dask.config.get( - "jobqueue.%s.shared-directory" % config_name + if shared_temp_directory is None: + shared_temp_directory = dask.config.get( + "jobqueue.%s.shared-temp-directory" % config_name ) - self.shared_directory = shared_directory - - if security is not None: - if shared_directory is None: - raise ValueError( - "Passing a security object to workers is only supported if a shared directory is configured" - ) + self.shared_temp_directory = shared_temp_directory job_kwargs["config_name"] = config_name job_kwargs["interface"] = interface @@ -626,43 +621,69 @@ def _new_worker_name(self, worker_number): ) def _get_worker_security(self, security): - """Dump temporary parts of the security object into a shared_directory""" - if security is not None: - copied_on_write = False - worker_security_dict = security.get_tls_config_for_role("worker") - for key, value in worker_security_dict.items(): - # dump worker in-memory keys for use in job_script - if value is not None and "\n" in value: - if not copied_on_write: - security = copy.copy(security) - copied_on_write = True + """Dump temporary parts of the security object into a shared_temp_directory""" + if security is None: + return None + + worker_security_dict = security.get_tls_config_for_role("worker") + + # only needed if temporary security is specified which only works + if not any( + [value is not None and "\n" for value in worker_security_dict.items()] + ): + return security + elif self.shared_temp_directory is None: + shared_temp_directory = os.getcwd() + warnings.warn( + "Using a temporary security object without explicitly setting a shared_temp_directory: \ +writing temp files to current working directory ({}) instead. You can set this value by \ +using dask for e.g. `dask.config.set({{'jobqueue.pbs.shared_temp_directory': '~'}})`\ +or by setting this value in the config file found in `~/.config/dask/jobqueue.yaml` ".format( + shared_temp_directory + ), + category=UserWarning, + ) + else: + shared_temp_directory = os.path.expanduser( + os.path.expandvars(self.shared_temp_directory) + ) + + security = copy.copy(security) + + for key, value in worker_security_dict.items(): + # dump worker in-memory keys for use in job_script + if value is not None and "\n" in value: + + try: f = tempfile.NamedTemporaryFile( mode="wt", prefix=".dask-jobqueue.worker." + key + ".", - dir=( - os.path.expanduser( - os.path.expandvars(self.shared_directory) - ) - if self.shared_directory is not None - else None - ), + dir=shared_temp_directory, ) - # make sure that tmpfile survives by keeping a reference - setattr(self, "_job_" + key, f) - f.write(value) - f.flush() - # allow expanding of vars and user paths in remote script - if self.shared_directory is not None: - fname = os.path.join( - self.shared_directory, os.path.basename(f.name) + except OSError as e: + raise OSError( + 'failed to dump security objects into shared_temp_directory({})"'.format( + shared_temp_directory ) - else: - fname = f.name - setattr( - security, - "tls_" + ("worker_" if key != "ca_file" else "") + key, - fname, + ) from e + + # make sure that tmpfile survives by keeping a reference + setattr(self, "_job_" + key, f) + f.write(value) + f.flush() + # allow expanding of vars and user paths in remote script + if self.shared_temp_directory is not None: + fname = os.path.join( + self.shared_temp_directory, os.path.basename(f.name) ) + else: + fname = f.name + setattr( + security, + "tls_" + ("worker_" if key != "ca_file" else "") + key, + fname, + ) + return security def scale(self, n=None, jobs=0, memory=None, cores=None): diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index bf0e0b13..85da8044 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -10,7 +10,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # OAR resource manager options @@ -37,7 +37,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # PBS resource manager options @@ -64,7 +64,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # SGE resource manager options @@ -91,7 +91,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # SLURM resource manager options @@ -119,7 +119,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # PBS resource manager options @@ -146,7 +146,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # LSF resource manager options @@ -176,7 +176,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "~" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] # HTCondor Resource Manager options @@ -201,7 +201,7 @@ jobqueue: interface: null # Network interface to use like eth0 or ib0 death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler local-directory: null # Location of fast local storage like /scratch or $TMPDIR - shared-directory: "/tmp" # Shared directory between scheduler and worker + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers extra: [] env-extra: [] diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index dbde4e28..99a55733 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -130,7 +130,7 @@ def test_config_name_htcondor_takes_custom_config(): "log-directory": None, "shebang": "#!/usr/bin/env condor_submit", "local-directory": "/tmp", - "shared-directory": None, + "shared-temp-directory": None, } with dask.config.set({"jobqueue.htcondor-config-name": conf}): diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index b5b8710f..7a05e4cd 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -419,26 +419,11 @@ def test_security(EnvSpecificCluster, loop): @pytest.mark.xfail_env({"slurm": "#535 no shared filesystem in slurm ci"}) def test_security_temporary(EnvSpecificCluster, loop): dirname = os.path.dirname(__file__) - with open(os.path.join(dirname, "key.pem"), "r") as file: - key = file.read() - with open(os.path.join(dirname, "ca.pem"), "r") as file: - cert = file.read() - security = Security( - tls_ca_file=cert, - tls_scheduler_key=key, - tls_scheduler_cert=cert, - tls_worker_key=key, - tls_worker_cert=cert, - tls_client_key=key, - tls_client_cert=cert, - require_encryption=True, - ) - with EnvSpecificCluster( cores=1, memory="100MB", - security=security, - shared_directory=dirname, + security=Security.temporary(), + shared_temp_directory=dirname, protocol="tls", loop=loop, ) as cluster: diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index 75035305..6e0489dc 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -315,7 +315,7 @@ def test_config_name_lsf_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", - "shared-directory": None, + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 69012fe5..8e4c409d 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -103,7 +103,7 @@ def test_config_name_oar_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", - "shared-directory": None, + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index aeda3257..887b69f3 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -354,7 +354,7 @@ def test_config_name_pbs_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", - "shared-directory": None, + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 2e7abb94..f743f90b 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -55,7 +55,7 @@ def test_config_name_sge_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", - "shared-directory": None, + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 0218361c..3218cacf 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -186,7 +186,7 @@ def test_config_name_slurm_takes_custom_config(): "interface": None, "death-timeout": None, "local-directory": "/foo", - "shared-directory": None, + "shared-temp-directory": None, "extra": [], "env-extra": [], "log-directory": None, diff --git a/setup.py b/setup.py index e5966511..a81c38cd 100755 --- a/setup.py +++ b/setup.py @@ -8,6 +8,14 @@ with open("requirements.txt") as f: install_requires = f.read().strip().split("\n") +extras_require = {} + +extras_require["test"] = [ + "pytest", + "pytest-asyncio", + "cryptography", +] + if exists("README.rst"): with open("README.rst") as f: long_description = f.read() From 89b9820fcc0bac709533008784298fa630f253ce Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Fri, 3 Dec 2021 23:34:39 +0100 Subject: [PATCH 13/18] revert change in htcondor because it xfails anyways and the problem with the ci is unrelated --- ci/htcondor.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/htcondor.sh b/ci/htcondor.sh index 3f9dce62..142b5b34 100755 --- a/ci/htcondor.sh +++ b/ci/htcondor.sh @@ -16,7 +16,7 @@ function jobqueue_before_install { function jobqueue_install { cd ./ci/htcondor - docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .[test];chown -R submituser ." + docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .;chown -R submituser ." cd - } From aecfcd09d80308336bdbc3be12cb9b946fdbdc9c Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Fri, 3 Dec 2021 23:46:51 +0100 Subject: [PATCH 14/18] forgot to set extras_required tests_required is deprecated but kept as in distributed --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index a81c38cd..0d8b62f6 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ include_package_data=True, install_requires=install_requires, tests_require=["pytest >= 2.7.1"], + extras_require=extras_require, long_description=long_description, zip_safe=False, ) From 18925dc6c4a34599ed12ac081647b110141af24d Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sat, 4 Dec 2021 09:06:01 +0100 Subject: [PATCH 15/18] fix incomplete comment --- dask_jobqueue/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index d2fafc6a..c4051c14 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -627,11 +627,12 @@ def _get_worker_security(self, security): worker_security_dict = security.get_tls_config_for_role("worker") - # only needed if temporary security is specified which only works + # dumping of certificates only needed if multiline in-memory keys are contained if not any( [value is not None and "\n" for value in worker_security_dict.items()] ): return security + # a shared temp directory should be configured correctly elif self.shared_temp_directory is None: shared_temp_directory = os.getcwd() warnings.warn( @@ -667,7 +668,7 @@ def _get_worker_security(self, security): ) ) from e - # make sure that tmpfile survives by keeping a reference + # make sure that the file is bound to life time of self by keeping a reference to the file handle setattr(self, "_job_" + key, f) f.write(value) f.flush() From a7d18bbbb8d6d752f962c385dadeb6d0d396d2a7 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sat, 4 Dec 2021 10:26:39 +0100 Subject: [PATCH 16/18] Fix comment Co-authored-by: Guillaume Eynard-Bontemps --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c4051c14..6523dda3 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -87,7 +87,7 @@ Changes the class of the used Dask Scheduler. Defaults to Dask's :class:`distributed.Scheduler`. shared_temp_directory : str - Shared directory between scheduler and worker defaults to "~" + Shared directory between scheduler and worker (used for example by temporary security certificates) defaults to current working directory if not set. """.strip() From 6340a7fe07258348c90a72894fc4a653b3227a13 Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sat, 4 Dec 2021 10:32:16 +0100 Subject: [PATCH 17/18] fixed error in check --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c4051c14..3922d3d1 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -629,7 +629,7 @@ def _get_worker_security(self, security): # dumping of certificates only needed if multiline in-memory keys are contained if not any( - [value is not None and "\n" for value in worker_security_dict.items()] + [(value is not None and "\n" in value) for value in worker_security_dict.values()] ): return security # a shared temp directory should be configured correctly From 18da41654d23e6a991a7b969691f44aacb6b22ab Mon Sep 17 00:00:00 2001 From: Till Riedel Date: Sat, 4 Dec 2021 10:50:17 +0100 Subject: [PATCH 18/18] fixed formatting --- dask_jobqueue/core.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index fd1a9bd7..de1338d6 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -87,7 +87,8 @@ Changes the class of the used Dask Scheduler. Defaults to Dask's :class:`distributed.Scheduler`. shared_temp_directory : str - Shared directory between scheduler and worker (used for example by temporary security certificates) defaults to current working directory if not set. + Shared directory between scheduler and worker (used for example by temporary + security certificates) defaults to current working directory if not set. """.strip() @@ -629,7 +630,10 @@ def _get_worker_security(self, security): # dumping of certificates only needed if multiline in-memory keys are contained if not any( - [(value is not None and "\n" in value) for value in worker_security_dict.values()] + [ + (value is not None and "\n" in value) + for value in worker_security_dict.values() + ] ): return security # a shared temp directory should be configured correctly