From 580e3195c72499444070ce71236223b629218318 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Wed, 18 Mar 2020 18:59:35 +0100 Subject: [PATCH 01/14] wip --- dask_jobqueue/core.py | 52 +++++++++++++++++++++++++-------- dask_jobqueue/htcondor.py | 6 ++-- dask_jobqueue/lsf.py | 4 +-- dask_jobqueue/oar.py | 4 +-- dask_jobqueue/pbs.py | 4 +-- dask_jobqueue/sge.py | 4 +-- dask_jobqueue/slurm.py | 4 +-- dask_jobqueue/tests/test_oar.py | 2 -- 8 files changed, 53 insertions(+), 27 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 97eb0e4f..9cfae790 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -136,6 +136,8 @@ def __init__( memory=None, processes=None, nanny=True, + protocol=None, + security=None, interface=None, death_timeout=None, local_directory=None, @@ -147,7 +149,6 @@ def __init__( python=sys.executable, job_name=None, config_name=None, - **kwargs ): self.scheduler = scheduler self.job_id = None @@ -210,6 +211,23 @@ def __init__( if interface: extra = extra + ["--interface", interface] + if protocol: + extra = extra + ["--protocol", protocol] + if security: + to_keep = ["tls_ca_file", "tls_worker_key", "tls_worker_cert"] + security_dict = {key: getattr(security, key, None) for key in to_keep} + security_dict = { + key: value for key, value in security_dict.items() if value is not None + } + security_dict = { + key.replace("worker_", ""): value + for key, value in security_dict.items() + } + security_command_line_list = [ + ["--" + key, value] for key, value in security_dict.items() + ] + security_command_line = sum(security_command_line_list, []) + extra = extra + security_command_line # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(memory) if memory is not None else None @@ -434,7 +452,7 @@ def __init__( protocol="tcp://", # Job keywords config_name=None, - **kwargs + **job_kwargs ): self.status = "created" @@ -499,14 +517,18 @@ def __init__( "options": scheduler_options, } - kwargs["config_name"] = config_name - kwargs["interface"] = interface - kwargs["protocol"] = protocol - kwargs["security"] = security - self._kwargs = kwargs - worker = {"cls": self.job_cls, "options": kwargs} - if "processes" in kwargs and kwargs["processes"] > 1: - worker["group"] = ["-" + str(i) for i in range(kwargs["processes"])] + job_kwargs["config_name"] = config_name + job_kwargs["interface"] = interface + job_kwargs["protocol"] = protocol + job_kwargs["security"] = security + self._job_kwargs = job_kwargs + self.validate_job_kwargs() + + worker = {"cls": self.job_cls, "options": self._job_kwargs} + if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1: + worker["group"] = [ + "-" + str(i) for i in range(self._job_kwargs["processes"]) + ] self._dummy_job # trigger property to ensure that the job is valid @@ -536,9 +558,9 @@ def _dummy_job(self): except AttributeError: address = "tcp://:8786" return self.job_cls( - scheduler=address or "tcp://:8786", + address or "tcp://:8786", name="name", - **self._kwargs + **self._job_kwargs ) @property @@ -604,3 +626,9 @@ def adapt( if maximum_jobs is not None: kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes return super().adapt(*args, **kwargs) + + def validate_job_kwargs(self): + # TODO + # self._job_kwargs + # Check self.job_cls.__init__ signature and Job.__init__.signature + pass diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index b2c8e082..522fbb25 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -40,10 +40,10 @@ def __init__( disk=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if disk is None: @@ -60,7 +60,7 @@ def __init__( else: self.job_extra = job_extra - env_extra = kwargs.get("env_extra", None) + env_extra = base_class_kwargs.get("env_extra", None) if env_extra is None: env_extra = dask.config.get( "jobqueue.%s.env-extra" % self.config_name, default=[] diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index b2fb7036..be590f3f 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -32,10 +32,10 @@ def __init__( lsf_units=None, config_name=None, use_stdin=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 6fdf029c..13e8cceb 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -26,10 +26,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 43f98eaa..c01d7d3b 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -49,10 +49,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index c9414e1f..10fa24a6 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -22,10 +22,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index c498b2cc..6e6c2139 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -25,10 +25,10 @@ def __init__( job_mem=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 5035852b..830b660e 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -19,8 +19,6 @@ def test_header(): processes=4, cores=8, memory="28GB", - job_cpu=16, - job_mem="100G", job_extra=["-t besteffort"], ) as cluster: assert "walltime=" in cluster.job_header From a8f317d6729bf48cdfa94d576a7f2472dc2c7a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 4 May 2020 17:30:52 +0200 Subject: [PATCH 02/14] Add job kwargs checks, no tests yet ... --- dask_jobqueue/core.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 9cfae790..b86d5807 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -8,6 +8,7 @@ import sys import weakref import abc +import inspect import dask from dask.utils import ignoring @@ -627,8 +628,24 @@ def adapt( kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes return super().adapt(*args, **kwargs) + @classmethod + def _allowed_parameters(cls, func): + sig = inspect.signature(func) + return { + k: v for k, v in sig.parameters.items() if v.kind == v.POSITIONAL_OR_KEYWORD + } + def validate_job_kwargs(self): - # TODO - # self._job_kwargs - # Check self.job_cls.__init__ signature and Job.__init__.signature - pass + allowed_parameters = set(self._allowed_parameters(Job.__init__)).union( + self._allowed_parameters(self.job_cls.__init__) + ) + # We don't want to list self as an allowed parameter + allowed_parameters.remove("self") + wrong_parameter_names = set(self._job_kwargs).difference(allowed_parameters) + wrong_parameters = {k: self._job_kwargs[k] for k in wrong_parameter_names} + if wrong_parameters: + raise ValueError( + "Wrong parameters: {}.\nHere are the list of allowed parameters: {}".format( + wrong_parameters, sorted(allowed_parameters) + ) + ) From 3efa12be0d67950a466d6c88f02387513257a2dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 16:30:27 +0200 Subject: [PATCH 03/14] Add test for wrong parameters. --- dask_jobqueue/core.py | 4 +++- dask_jobqueue/tests/test_jobqueue_core.py | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index b86d5807..90c3ff1b 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -641,7 +641,9 @@ def validate_job_kwargs(self): ) # We don't want to list self as an allowed parameter allowed_parameters.remove("self") - wrong_parameter_names = set(self._job_kwargs).difference(allowed_parameters) + wrong_parameter_names = [ + kw for kw in self._job_kwargs if kw not in allowed_parameters + ] wrong_parameters = {k: self._job_kwargs[k] for k in wrong_parameter_names} if wrong_parameters: raise ValueError( diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 39d97831..bf445c67 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -393,3 +393,20 @@ def test_import_scheduler_options_from_config(Cluster): scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == pass_scheduler_interface assert scheduler_options.get("port") is None + + +@pytest.mark.parametrize("Cluster", all_clusters) +def test_wrong_parameter_error(Cluster): + match = re.compile( + "Wrong parameters.+wrong_parameter.+another_wrong_parameter.+" + "list of allowed parameters.+cores.+memory", + re.DOTALL, + ) + with pytest.raises(ValueError, match=match): + create_cluster_func( + Cluster, + cores=1, + memory="1GB", + wrong_parameter="asdf", + another_wrong_parameter="asdf", + ) From 532fbd17725b81fd49873096346c3359037a837f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 19:30:47 +0200 Subject: [PATCH 04/14] Add LocalCluster test for security. --- dask_jobqueue/core.py | 4 ++- dask_jobqueue/tests/test_jobqueue_core.py | 32 +++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 90c3ff1b..34a07330 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -221,7 +221,8 @@ def __init__( key: value for key, value in security_dict.items() if value is not None } security_dict = { - key.replace("worker_", ""): value + # TODO: maybe a way to do that through a distributed function? + key.replace("worker_", "").replace("_", "-"): value for key, value in security_dict.items() } security_command_line_list = [ @@ -537,6 +538,7 @@ def __init__( scheduler=scheduler, worker=worker, loop=loop, + security=security, silence_logs=silence_logs, asynchronous=asynchronous, name=name, diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index bf445c67..1fc47b68 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -9,6 +9,9 @@ import dask +from distributed.security import Security +from distributed import Client + from dask_jobqueue import ( JobQueueCluster, PBSCluster, @@ -410,3 +413,32 @@ def test_wrong_parameter_error(Cluster): wrong_parameter="asdf", another_wrong_parameter="asdf", ) + + +@pytest.mark.asyncio +async def test_security(): + dirname = os.path.dirname(__file__) + key = os.path.join(dirname, "key.pem") + cert = os.path.join(dirname, "ca.pem") + print(key) + print(cert) + security = Security( + tls_ca_file=cert, + tls_scheduler_key=key, + tls_scheduler_cert=cert, + tls_worker_key=key, + tls_worker_cert=cert, + tls_client_key=key, + tls_client_cert=cert, + require_encryption=True, + ) + + async with LocalCluster( + cores=1, memory="1GB", security=security, protocol="tls", asynchronous=True + ) as cluster: + await cluster + cluster.scale(jobs=1) + with Client(cluster, security=security, asynchronous=True) as client: + future = client.submit(lambda x: x + 1, 10) + result = await future + assert result == 11 From 1ce4a5665725cb392459b8b2db0e9fa3b907d7c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 21:02:34 +0200 Subject: [PATCH 05/14] Add TLS files. --- dask_jobqueue/tests/ca.pem | 31 ++++++++++++++++++++++ dask_jobqueue/tests/key.pem | 52 +++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 dask_jobqueue/tests/ca.pem create mode 100644 dask_jobqueue/tests/key.pem diff --git a/dask_jobqueue/tests/ca.pem b/dask_jobqueue/tests/ca.pem new file mode 100644 index 00000000..46c112dd --- /dev/null +++ b/dask_jobqueue/tests/ca.pem @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFazCCA1OgAwIBAgIUHfc6OUT6HK590ABdtsRzglDZsfowDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDA1MjUxNDQ5MzlaFw0zMDA1 +MjMxNDQ5MzlaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQCb5PcZthd/Ssv4JpsxGUww9KsA+HknTQVAWUN85bC4 +BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+Yqp7sDGbV/4U9qVPuJfbxddLPyQU +QenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2bJw2oJPPktjMRNfHI+lWtzhMVOhWz +nKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GLp4AAhEGWtn3GFJDCCWpJM4fBeChZ +6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DNDmiWk48BrPgwemKdePt+7YN1UmV88 +YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/rPXsUmRbus0ZW9ASCnVEbt9FtHQZ+ +e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5idrForTZvadKzf2licSpRP+wlRruhZ +k85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvgqXDMNrSCzUZ9aCLL6PWx+wqLHq9p +dU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L3yTfQ1pL3Z33KLDC9OfWCGymgarL +1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjKfxR3vKIBA0/2qb0DfktAasekyx3U +TC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZXfbdElsBGcGPDwh38RjSVq+fzm7b +bwIDAQABo1MwUTAdBgNVHQ4EFgQU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwHwYDVR0j +BBgwFoAU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwDwYDVR0TAQH/BAUwAwEB/zANBgkq +hkiG9w0BAQsFAAOCAgEAMNWu/3ci9SxiZ2YO/x4VBtvuhB2D0eAF71NXdJW+F2WN +ol+ut2mHyXeqJTqme6nwMkItuCk/M3/rtxJYa+i77PkRP+dv10jfOLNgc0nMTCKx +ll6AlhX3Mz4kt4/8g1h6oyee/Tau4VK9UTLzlxHiPSX4HrPR1g8jEyA5K9pwfX0p +RBLAlaStyemH/HkCX0HpG8PSJfEbmLsbJR5cOKtGAgt+mUnPjKGvuWOkohrOzFci +dMaWCULZX6PEdeJaXNerP7mR/BH3nOHSWja1zqr4t4h4e7hOXREseUrvUB9pHNkv +T2UrbbBMGElHVlHtxtaxdu/zNzL1DBxBkLjyw+SberFFQAQR5I0yCWOhrwzONzH6 +XPL81igvUao5BQ7quJpy58EPqGwALAkB5QDsvH4uD7Bgwp8IYAtcRWWGL8WnuEt5 +shS4yK4UXFCgUeg580+gZ7t1nH6f55/A7d/g0DdpvBBTZVYRJFpiJCnCTH6LbjI0 +XXe5vDubTxjLSZemP4w+w+Oe18U2tJ6DXb2rUZGoWECnfbYfVmO2hdLcgp2nenXl +dcRANXfZT9R34wzVx/LUJA9hjOgfFF38CC2RfOZr9bb1IjLD/X4V5SrxsfgnQ4P/ +/RKGyQ5dpeK/lpdgYJuqMgUffy8KDrQdMaBq8+0RdVL3nm5+tPFKcskMI2f/zyc= +-----END CERTIFICATE----- diff --git a/dask_jobqueue/tests/key.pem b/dask_jobqueue/tests/key.pem new file mode 100644 index 00000000..9c23d8d0 --- /dev/null +++ b/dask_jobqueue/tests/key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCb5PcZthd/Ssv4 +JpsxGUww9KsA+HknTQVAWUN85bC4BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+ +Yqp7sDGbV/4U9qVPuJfbxddLPyQUQenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2b +Jw2oJPPktjMRNfHI+lWtzhMVOhWznKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GL +p4AAhEGWtn3GFJDCCWpJM4fBeChZ6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DND +miWk48BrPgwemKdePt+7YN1UmV88YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/r +PXsUmRbus0ZW9ASCnVEbt9FtHQZ+e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5id +rForTZvadKzf2licSpRP+wlRruhZk85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvg +qXDMNrSCzUZ9aCLL6PWx+wqLHq9pdU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L +3yTfQ1pL3Z33KLDC9OfWCGymgarL1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjK +fxR3vKIBA0/2qb0DfktAasekyx3UTC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZ +XfbdElsBGcGPDwh38RjSVq+fzm7bbwIDAQABAoICAHFc4ddfk9yr3oEYSdg9Zitf +cA6noSpUFtKBVtM3D/fypNyhqscyDubMdVFpIqPtpkq1bewLIDfq99Z/1ytUp+4n +4YsLBJFhUYSubG26f5j/iWkIPLunLNsMXHt4+oKbv7F80qUq5O5Xhr/heUvhez3A +xIfqa2VTrQRTi4rvDyLqcIuk0VSXQnUDxUJ+hEJG7IsiH9KLkxWyIc8FQc6eXjej +gviMset3/0M15q1onCcvACNftiukZVYCsZVabXWH423mC7tpDcu897JcIK20NJOH +rjZ9mY+uNvHCcZB4a0mzP9XxgBn9QqKNmJ+4JI4UzRdvRkU1kMqKYK0Wqy8CWCqf +kw7FrRb2AO94bK3CMQ+9G0AJCLD4tlh/Xw9DIh4h2iv3G3T9GVmH0fLnF2LDKy2G +7Z0oC65s3TjeO2a9SRvlgEmf28EujjCil0RZx6Dnni171GVARohdLZaGDquj5V26 +Sc2iTaHDXKcRCyntORJ7hyJt7PsCU7nj4dMLc4q35mW4Finxt0Is5w+IGIL+fKEI +0ZrTf6eBRNU4rLagkiCmhBydmJaj8jMfl7XVHXEoOT2AR0dk8XfIK/LO9d/5zTJA +y1Wx9THg5wUK2KoimQXg+qZFw06fEjlbzEl0DlZOdOrk2O1lX+cYrW0pbUH2OKOh +TGjMN3f45sCfDEvHQg35AoIBAQDPTDnv7ZiKDI5vpl6+uNnVQRwlKQ+QrD+FxrW3 +52b23eVcPP7kcZ4xDeJ0T6ZdGGJ41SRKUn6gsojQ02yReSn02/BZVmL/shEweXhM +VrHmMbwOUYGwcVmp9jjSTH4VVEl42oMpc8JCrEAKAda09ZuG4+PbFOtGBskWnBNE +uhJHlgL5CRODLmy+jZvafsTLHIqfDlq18jxyPz0SsaXGcV5Lm2Ob6DPRKvp8578X +ehwpJcJMvqAmzwflgVj+Qb2K0/ceDvjajC3BMcp63gjCAojTbbHrIrXzS0ClSjUg +U8sfUaXe0wOAqsKIl+evQtNwiv/2QMmvC1jHI7miMeu7MSJtAoIBAQDAhR7uAzJc +quFtjfbwCJswzAsQ8W4YOioTLWFW2W/+nS4MfFlxFEuOeJpvYmViD67UFJrCY9bO +xXyQvTQPN1iVFAjqG3Ur40/+zFGm1uXR0rF2f0oafceKodT2+TyDcoLkVoi2h93k +PfgCsen+ajwXYNH4utHN9QoykRKESzwWtJOcY2vBWxItFmumFborv/6oJkCMXWC8 +o5wjIJHXZ9q/bxvNNnENnXafbYWyKlwu6f0CXeTd4Dt4zD7Ab861/F0zP4GXlrR/ +ecPhqxaS5U51VfOZp4q5Ey8HbWxVENMA9z+4dp7P+/A2WLLdyLz23tYo7wDmpqQ0 +CUZ2YNeUSWvLAoIBAQCLxczm7uBEbNT5iUcm+ALA3Mb6c2YwWUP3kpVia9+sItAM +0n/XTpioYMFJRY9aBCAZczWl+1uwRNElZPk2WWkl1cqIokvcNpeKhMzNRENtgClZ +yjFU5AjeJcwIWFVHUm670zJPF+NrCzOey8CWgWidmjk/tioxLFAYM6J2W7QJmqdk +fW8vq3TdQyRMPd+5SARb3NTjC3MgYW1vlmK9nCFFf3+5VubhaUY+RBA/5zDnubL6 +Bip8IGoloIJ95ZvE6Mkd9mBrE8uiEU2CbQWgsw7I6JTng58Fbb8n9BJAOt+hvW7H +AKbC8eB7M1mffcKNhtuxkdurcE4q7/ax21EkBaw9AoIBAHdA/svIxyWH5GQMkG5X +pmovupsgMmZngTCn56f4wNsjWib50B2vyK3UHzXn6Y040b8llEfduG4U/vhZeyoB +yqlt46fAonAxOphG0D1c2LeEn9EbQDfwue4yGM1zzfxOrq3qvHz05IpBqKNiueOS +wu5oVyiP8O53X327R5ETWYFnEhjJrTH0y+mJ/dy/kLcRExntuAY6wXWYk1tfDXg1 +KNd0Z/BSTO12IMjY+vxGKRwWbVdN+jtGbxCA1E438//e94yLRic0f1KHhsL/S9hq +mpMsTt1bXx8NtxAOxBBdf5cVkS8eq3mCQmYnw4SGmCcEGfz5L8Gwb/6b0D379w4v +/MkCggEAEwIOqhgwUjscb1L9NTehFa3CaK8eQwkEvB7aQLhTbTUK4bigSd6f20Ju +3N/zmHhSkEm6Oq9+zrjm8QqgztrB3Zn+nCWu2n7BHFoCexg4Qov3xuKsC3MMh/oa +/xqgVgsFOgqk93YWv/6IvCHM8F3Ntk+rcUlt0M1VxXC8Ym/vRHy5VN12asI19K1M +UbZINK0bVAdEdwaB2OSx14yag9uiaXxKryZe7qTjcR2ELxksVo0nwX7h0NHA1S77 +Ubq57jxR5a/wI9VA5iTUZSUkjxfEdPUEAY1eDdp+Ogqcs1Zsa8Er0JUNPxqJzmlJ +Q2ypwp0CapUz0d43a47/izrjAtXzug== +-----END PRIVATE KEY----- From a62ef3fa2612675fa5ea9c7af518cf56bf2ee184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 21:12:30 +0200 Subject: [PATCH 06/14] Remove async and debug statement. --- dask_jobqueue/tests/test_jobqueue_core.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 1fc47b68..c3675449 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -415,13 +415,10 @@ def test_wrong_parameter_error(Cluster): ) -@pytest.mark.asyncio -async def test_security(): +def test_security(): dirname = os.path.dirname(__file__) key = os.path.join(dirname, "key.pem") cert = os.path.join(dirname, "ca.pem") - print(key) - print(cert) security = Security( tls_ca_file=cert, tls_scheduler_key=key, @@ -433,12 +430,11 @@ async def test_security(): require_encryption=True, ) - async with LocalCluster( - cores=1, memory="1GB", security=security, protocol="tls", asynchronous=True + with LocalCluster( + cores=1, memory="1GB", security=security, protocol="tls" ) as cluster: - await cluster cluster.scale(jobs=1) - with Client(cluster, security=security, asynchronous=True) as client: + with Client(cluster, security=security) as client: future = client.submit(lambda x: x + 1, 10) - result = await future + result = future.result() assert result == 11 From 467bec60234ecd40fd48887054302de2a726b5ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 21:32:56 +0200 Subject: [PATCH 07/14] Add changelog. --- docs/source/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index c017867b..99ad1e16 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -17,6 +17,8 @@ Development version ``threads_per_process=cores``. (:pr:`375`) - all cluster classes: ``interface`` was ignored when set in a config file. (:pr:`366`) +- all cluster classes: fix a bug that would allow to pass any named parameter without an error (:pr:`398`) +- all cluster classes: fix a bug where ``security`` was not correctly passed through (:pr:`398`) - ``LSFCluster``: switch to ``use_stdin=True`` by default (:pr:`388`). - ``LSFCluster``: add ``use_stdin`` to ``LSFCluster``. This switches between ``bsub < job_script`` and ``bsub job_script`` to launch a ``LSF`` job From 1e91e852fd8eb37337d3dd06306d62f249a257cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 22:07:33 +0200 Subject: [PATCH 08/14] Improve security to comman-line argument conversion --- dask_jobqueue/core.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 34a07330..5d9f4f4e 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -215,18 +215,12 @@ def __init__( if protocol: extra = extra + ["--protocol", protocol] if security: - to_keep = ["tls_ca_file", "tls_worker_key", "tls_worker_cert"] - security_dict = {key: getattr(security, key, None) for key in to_keep} - security_dict = { - key: value for key, value in security_dict.items() if value is not None - } - security_dict = { - # TODO: maybe a way to do that through a distributed function? - key.replace("worker_", "").replace("_", "-"): value - for key, value in security_dict.items() - } + worker_security_dict = security.get_tls_config_for_role("worker") security_command_line_list = [ - ["--" + key, value] for key, value in security_dict.items() + ["--tls-" + key.replace("_", "-"), value] + for key, value in worker_security_dict.items() + # 'ciphers' parameter does not have a command-line equivalent + if key != "ciphers" ] security_command_line = sum(security_command_line_list, []) extra = extra + security_command_line From bc2a298badce7e32a2ac443c07db374289f978de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Mon, 25 May 2020 22:13:45 +0200 Subject: [PATCH 09/14] More gradual test failure. --- dask_jobqueue/tests/test_jobqueue_core.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index c3675449..b5fecd20 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -433,6 +433,13 @@ def test_security(): with LocalCluster( cores=1, memory="1GB", security=security, protocol="tls" ) as cluster: + assert cluster.security is security + assert cluster.scheduler_spec["options"]["security"] is security + job_script = cluster.job_script() + assert "--tls-key {}".format(key) in job_script + assert "--tls-cert {}".format(cert) in job_script + assert "--tls-ca-file {}".format(cert) in job_script + cluster.scale(jobs=1) with Client(cluster, security=security) as client: future = client.submit(lambda x: x + 1, 10) From cf3485cb0c8dc68c175f5bb99aa098f231efad23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 26 May 2020 12:10:52 +0200 Subject: [PATCH 10/14] Alternative error message strategy by catching TypeError. --- dask_jobqueue/core.py | 48 ++++++++--------------- dask_jobqueue/tests/test_jobqueue_core.py | 11 ++---- 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5d9f4f4e..4e14c465 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -8,7 +8,6 @@ import sys import weakref import abc -import inspect import dask from dask.utils import ignoring @@ -518,7 +517,6 @@ def __init__( job_kwargs["protocol"] = protocol job_kwargs["security"] = security self._job_kwargs = job_kwargs - self.validate_job_kwargs() worker = {"cls": self.job_cls, "options": self._job_kwargs} if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1: @@ -554,11 +552,23 @@ def _dummy_job(self): address = self.scheduler.address # Have we already connected? except AttributeError: address = "tcp://:8786" - return self.job_cls( - address or "tcp://:8786", - name="name", - **self._job_kwargs - ) + try: + return self.job_cls( + address or "tcp://:8786", + name="name", + **self._job_kwargs + ) + except TypeError as exc: + match = re.search("(unexpected keyword argument.+)", str(exc)) + if not match: + raise + message_orig = match.group(1) + raise ValueError( + 'Got {}. Very likely wrong parameters were passed as "job_kwargs" in {} constructor:\n' + "job_kwargs={}".format( + message_orig, self.__class__.__name__, self._job_kwargs + ) + ) from exc @property def job_header(self): @@ -623,27 +633,3 @@ def adapt( if maximum_jobs is not None: kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes return super().adapt(*args, **kwargs) - - @classmethod - def _allowed_parameters(cls, func): - sig = inspect.signature(func) - return { - k: v for k, v in sig.parameters.items() if v.kind == v.POSITIONAL_OR_KEYWORD - } - - def validate_job_kwargs(self): - allowed_parameters = set(self._allowed_parameters(Job.__init__)).union( - self._allowed_parameters(self.job_cls.__init__) - ) - # We don't want to list self as an allowed parameter - allowed_parameters.remove("self") - wrong_parameter_names = [ - kw for kw in self._job_kwargs if kw not in allowed_parameters - ] - wrong_parameters = {k: self._job_kwargs[k] for k in wrong_parameter_names} - if wrong_parameters: - raise ValueError( - "Wrong parameters: {}.\nHere are the list of allowed parameters: {}".format( - wrong_parameters, sorted(allowed_parameters) - ) - ) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index b5fecd20..d15f5ce1 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -401,17 +401,14 @@ def test_import_scheduler_options_from_config(Cluster): @pytest.mark.parametrize("Cluster", all_clusters) def test_wrong_parameter_error(Cluster): match = re.compile( - "Wrong parameters.+wrong_parameter.+another_wrong_parameter.+" - "list of allowed parameters.+cores.+memory", + "unexpected keyword argument.+wrong_parameter.+" + "{}.+job_kwargs.+cores.+memory.+" + "wrong_parameter.+wrong_parameter_value".format(Cluster.__name__), re.DOTALL, ) with pytest.raises(ValueError, match=match): create_cluster_func( - Cluster, - cores=1, - memory="1GB", - wrong_parameter="asdf", - another_wrong_parameter="asdf", + Cluster, cores=1, memory="1GB", wrong_parameter="wrong_parameter_value", ) From b95ec05f5e2cfc6d20453f535ad80e7625e70a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 26 May 2020 12:22:46 +0200 Subject: [PATCH 11/14] Tweak test by using == rather than is --- dask_jobqueue/tests/test_jobqueue_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index d15f5ce1..09907411 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -430,8 +430,8 @@ def test_security(): with LocalCluster( cores=1, memory="1GB", security=security, protocol="tls" ) as cluster: - assert cluster.security is security - assert cluster.scheduler_spec["options"]["security"] is security + assert cluster.security == security + assert cluster.scheduler_spec["options"]["security"] == security job_script = cluster.job_script() assert "--tls-key {}".format(key) in job_script assert "--tls-cert {}".format(cert) in job_script From 44ad88ffb489350845a2a52933c79c287530685a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 26 May 2020 12:40:53 +0200 Subject: [PATCH 12/14] Tweak error message. --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 4e14c465..1d34420b 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -564,7 +564,7 @@ def _dummy_job(self): raise message_orig = match.group(1) raise ValueError( - 'Got {}. Very likely wrong parameters were passed as "job_kwargs" in {} constructor:\n' + 'Got {}. Very likely this unexpected parameter was passed in "job_kwargs" in the {} constructor:\n' "job_kwargs={}".format( message_orig, self.__class__.__name__, self._job_kwargs ) From 5b78e53e50408083e7c82d8a19366dd8e96fb795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 26 May 2020 12:44:15 +0200 Subject: [PATCH 13/14] Add comment --- dask_jobqueue/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1d34420b..b428be35 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -559,6 +559,9 @@ def _dummy_job(self): **self._job_kwargs ) except TypeError as exc: + # Very likely this error happened because an unexpected parameter + # was used in the JobQueueCluster constructor, construct a more + # user-friendly error message. match = re.search("(unexpected keyword argument.+)", str(exc)) if not match: raise From 5aa17459c44d9c807d62c0baa44d802664fa0336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Tue, 26 May 2020 12:55:31 +0200 Subject: [PATCH 14/14] Tweak comment --- dask_jobqueue/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index b428be35..5d6ed4fe 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -559,9 +559,9 @@ def _dummy_job(self): **self._job_kwargs ) except TypeError as exc: - # Very likely this error happened because an unexpected parameter - # was used in the JobQueueCluster constructor, construct a more - # user-friendly error message. + # Very likely this error happened in the self.job_cls constructor + # because an unexpected parameter was used in the JobQueueCluster + # constructor. The next few lines builds a more user-friendly error message. match = re.search("(unexpected keyword argument.+)", str(exc)) if not match: raise