diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 97eb0e4f..5d6ed4fe 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -136,6 +136,8 @@ def __init__( memory=None, processes=None, nanny=True, + protocol=None, + security=None, interface=None, death_timeout=None, local_directory=None, @@ -147,7 +149,6 @@ def __init__( python=sys.executable, job_name=None, config_name=None, - **kwargs ): self.scheduler = scheduler self.job_id = None @@ -210,6 +211,18 @@ def __init__( if interface: extra = extra + ["--interface", interface] + if protocol: + extra = extra + ["--protocol", protocol] + if security: + worker_security_dict = security.get_tls_config_for_role("worker") + security_command_line_list = [ + ["--tls-" + key.replace("_", "-"), value] + for key, value in worker_security_dict.items() + # 'ciphers' parameter does not have a command-line equivalent + if key != "ciphers" + ] + security_command_line = sum(security_command_line_list, []) + extra = extra + security_command_line # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(memory) if memory is not None else None @@ -434,7 +447,7 @@ def __init__( protocol="tcp://", # Job keywords config_name=None, - **kwargs + **job_kwargs ): self.status = "created" @@ -499,14 +512,17 @@ def __init__( "options": scheduler_options, } - kwargs["config_name"] = config_name - kwargs["interface"] = interface - kwargs["protocol"] = protocol - kwargs["security"] = security - self._kwargs = kwargs - worker = {"cls": self.job_cls, "options": kwargs} - if "processes" in kwargs and kwargs["processes"] > 1: - worker["group"] = ["-" + str(i) for i in range(kwargs["processes"])] + job_kwargs["config_name"] = config_name + job_kwargs["interface"] = interface + job_kwargs["protocol"] = protocol + job_kwargs["security"] = security + self._job_kwargs = job_kwargs + + worker = {"cls": self.job_cls, "options": self._job_kwargs} + if "processes" in self._job_kwargs and self._job_kwargs["processes"] > 1: + worker["group"] = [ + "-" + str(i) for i in range(self._job_kwargs["processes"]) + ] self._dummy_job # trigger property to ensure that the job is valid @@ -514,6 +530,7 @@ def __init__( scheduler=scheduler, worker=worker, loop=loop, + security=security, silence_logs=silence_logs, asynchronous=asynchronous, name=name, @@ -535,11 +552,26 @@ def _dummy_job(self): address = self.scheduler.address # Have we already connected? except AttributeError: address = "tcp://:8786" - return self.job_cls( - scheduler=address or "tcp://:8786", - name="name", - **self._kwargs - ) + try: + return self.job_cls( + address or "tcp://:8786", + name="name", + **self._job_kwargs + ) + except TypeError as exc: + # Very likely this error happened in the self.job_cls constructor + # because an unexpected parameter was used in the JobQueueCluster + # constructor. The next few lines builds a more user-friendly error message. + match = re.search("(unexpected keyword argument.+)", str(exc)) + if not match: + raise + message_orig = match.group(1) + raise ValueError( + 'Got {}. Very likely this unexpected parameter was passed in "job_kwargs" in the {} constructor:\n' + "job_kwargs={}".format( + message_orig, self.__class__.__name__, self._job_kwargs + ) + ) from exc @property def job_header(self): diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index b2c8e082..522fbb25 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -40,10 +40,10 @@ def __init__( disk=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if disk is None: @@ -60,7 +60,7 @@ def __init__( else: self.job_extra = job_extra - env_extra = kwargs.get("env_extra", None) + env_extra = base_class_kwargs.get("env_extra", None) if env_extra is None: env_extra = dask.config.get( "jobqueue.%s.env-extra" % self.config_name, default=[] diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index b2fb7036..be590f3f 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -32,10 +32,10 @@ def __init__( lsf_units=None, config_name=None, use_stdin=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 6fdf029c..13e8cceb 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -26,10 +26,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 43f98eaa..c01d7d3b 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -49,10 +49,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index c9414e1f..10fa24a6 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -22,10 +22,10 @@ def __init__( walltime=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index c498b2cc..6e6c2139 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -25,10 +25,10 @@ def __init__( job_mem=None, job_extra=None, config_name=None, - **kwargs + **base_class_kwargs ): super().__init__( - scheduler=scheduler, name=name, config_name=config_name, **kwargs + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs ) if queue is None: diff --git a/dask_jobqueue/tests/ca.pem b/dask_jobqueue/tests/ca.pem new file mode 100644 index 00000000..46c112dd --- /dev/null +++ b/dask_jobqueue/tests/ca.pem @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFazCCA1OgAwIBAgIUHfc6OUT6HK590ABdtsRzglDZsfowDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDA1MjUxNDQ5MzlaFw0zMDA1 +MjMxNDQ5MzlaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQCb5PcZthd/Ssv4JpsxGUww9KsA+HknTQVAWUN85bC4 +BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+Yqp7sDGbV/4U9qVPuJfbxddLPyQU +QenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2bJw2oJPPktjMRNfHI+lWtzhMVOhWz +nKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GLp4AAhEGWtn3GFJDCCWpJM4fBeChZ +6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DNDmiWk48BrPgwemKdePt+7YN1UmV88 +YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/rPXsUmRbus0ZW9ASCnVEbt9FtHQZ+ +e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5idrForTZvadKzf2licSpRP+wlRruhZ +k85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvgqXDMNrSCzUZ9aCLL6PWx+wqLHq9p +dU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L3yTfQ1pL3Z33KLDC9OfWCGymgarL +1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjKfxR3vKIBA0/2qb0DfktAasekyx3U +TC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZXfbdElsBGcGPDwh38RjSVq+fzm7b +bwIDAQABo1MwUTAdBgNVHQ4EFgQU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwHwYDVR0j +BBgwFoAU+UmkXa3pmOtN4bN3LEm6Vbc2KVMwDwYDVR0TAQH/BAUwAwEB/zANBgkq +hkiG9w0BAQsFAAOCAgEAMNWu/3ci9SxiZ2YO/x4VBtvuhB2D0eAF71NXdJW+F2WN +ol+ut2mHyXeqJTqme6nwMkItuCk/M3/rtxJYa+i77PkRP+dv10jfOLNgc0nMTCKx +ll6AlhX3Mz4kt4/8g1h6oyee/Tau4VK9UTLzlxHiPSX4HrPR1g8jEyA5K9pwfX0p +RBLAlaStyemH/HkCX0HpG8PSJfEbmLsbJR5cOKtGAgt+mUnPjKGvuWOkohrOzFci +dMaWCULZX6PEdeJaXNerP7mR/BH3nOHSWja1zqr4t4h4e7hOXREseUrvUB9pHNkv +T2UrbbBMGElHVlHtxtaxdu/zNzL1DBxBkLjyw+SberFFQAQR5I0yCWOhrwzONzH6 +XPL81igvUao5BQ7quJpy58EPqGwALAkB5QDsvH4uD7Bgwp8IYAtcRWWGL8WnuEt5 +shS4yK4UXFCgUeg580+gZ7t1nH6f55/A7d/g0DdpvBBTZVYRJFpiJCnCTH6LbjI0 +XXe5vDubTxjLSZemP4w+w+Oe18U2tJ6DXb2rUZGoWECnfbYfVmO2hdLcgp2nenXl +dcRANXfZT9R34wzVx/LUJA9hjOgfFF38CC2RfOZr9bb1IjLD/X4V5SrxsfgnQ4P/ +/RKGyQ5dpeK/lpdgYJuqMgUffy8KDrQdMaBq8+0RdVL3nm5+tPFKcskMI2f/zyc= +-----END CERTIFICATE----- diff --git a/dask_jobqueue/tests/key.pem b/dask_jobqueue/tests/key.pem new file mode 100644 index 00000000..9c23d8d0 --- /dev/null +++ b/dask_jobqueue/tests/key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCb5PcZthd/Ssv4 +JpsxGUww9KsA+HknTQVAWUN85bC4BxLS8IKaE302yGBg8MK9w0z+qTOoMFNMqoo+ +Yqp7sDGbV/4U9qVPuJfbxddLPyQUQenGXsyAMLEmVXCmQHXlcZCsIlY0je6c7o2b +Jw2oJPPktjMRNfHI+lWtzhMVOhWznKRMwikfaV80Vn80Jx7zY2A2kcsoGL16Y/GL +p4AAhEGWtn3GFJDCCWpJM4fBeChZ6wOxP/KnNsJZ2PeHb+yTXT347ZFRdKHB6DND +miWk48BrPgwemKdePt+7YN1UmV88YCl54BecgGiu3Iav0gqCHv2JIDZtyUtfBA/r +PXsUmRbus0ZW9ASCnVEbt9FtHQZ+e/BmLL6WDg58lGE4TeI+EaK1FqxbVEB3K5id +rForTZvadKzf2licSpRP+wlRruhZk85lT50aY5tvQIPq+FnX2mQwosMln0hIPXvg +qXDMNrSCzUZ9aCLL6PWx+wqLHq9pdU/2upPM9AEiPh1M9nEXL+FdbQMB6wNbpo/L +3yTfQ1pL3Z33KLDC9OfWCGymgarL1uxeUbt6XeCB7Exb+1fT73RtjeO2QFTZrCjK +fxR3vKIBA0/2qb0DfktAasekyx3UTC2ZhT/wNP4LBP/l1Oasn47e6EPs2slZLPHZ +XfbdElsBGcGPDwh38RjSVq+fzm7bbwIDAQABAoICAHFc4ddfk9yr3oEYSdg9Zitf +cA6noSpUFtKBVtM3D/fypNyhqscyDubMdVFpIqPtpkq1bewLIDfq99Z/1ytUp+4n +4YsLBJFhUYSubG26f5j/iWkIPLunLNsMXHt4+oKbv7F80qUq5O5Xhr/heUvhez3A +xIfqa2VTrQRTi4rvDyLqcIuk0VSXQnUDxUJ+hEJG7IsiH9KLkxWyIc8FQc6eXjej +gviMset3/0M15q1onCcvACNftiukZVYCsZVabXWH423mC7tpDcu897JcIK20NJOH +rjZ9mY+uNvHCcZB4a0mzP9XxgBn9QqKNmJ+4JI4UzRdvRkU1kMqKYK0Wqy8CWCqf +kw7FrRb2AO94bK3CMQ+9G0AJCLD4tlh/Xw9DIh4h2iv3G3T9GVmH0fLnF2LDKy2G +7Z0oC65s3TjeO2a9SRvlgEmf28EujjCil0RZx6Dnni171GVARohdLZaGDquj5V26 +Sc2iTaHDXKcRCyntORJ7hyJt7PsCU7nj4dMLc4q35mW4Finxt0Is5w+IGIL+fKEI +0ZrTf6eBRNU4rLagkiCmhBydmJaj8jMfl7XVHXEoOT2AR0dk8XfIK/LO9d/5zTJA +y1Wx9THg5wUK2KoimQXg+qZFw06fEjlbzEl0DlZOdOrk2O1lX+cYrW0pbUH2OKOh +TGjMN3f45sCfDEvHQg35AoIBAQDPTDnv7ZiKDI5vpl6+uNnVQRwlKQ+QrD+FxrW3 +52b23eVcPP7kcZ4xDeJ0T6ZdGGJ41SRKUn6gsojQ02yReSn02/BZVmL/shEweXhM +VrHmMbwOUYGwcVmp9jjSTH4VVEl42oMpc8JCrEAKAda09ZuG4+PbFOtGBskWnBNE +uhJHlgL5CRODLmy+jZvafsTLHIqfDlq18jxyPz0SsaXGcV5Lm2Ob6DPRKvp8578X +ehwpJcJMvqAmzwflgVj+Qb2K0/ceDvjajC3BMcp63gjCAojTbbHrIrXzS0ClSjUg +U8sfUaXe0wOAqsKIl+evQtNwiv/2QMmvC1jHI7miMeu7MSJtAoIBAQDAhR7uAzJc +quFtjfbwCJswzAsQ8W4YOioTLWFW2W/+nS4MfFlxFEuOeJpvYmViD67UFJrCY9bO +xXyQvTQPN1iVFAjqG3Ur40/+zFGm1uXR0rF2f0oafceKodT2+TyDcoLkVoi2h93k +PfgCsen+ajwXYNH4utHN9QoykRKESzwWtJOcY2vBWxItFmumFborv/6oJkCMXWC8 +o5wjIJHXZ9q/bxvNNnENnXafbYWyKlwu6f0CXeTd4Dt4zD7Ab861/F0zP4GXlrR/ +ecPhqxaS5U51VfOZp4q5Ey8HbWxVENMA9z+4dp7P+/A2WLLdyLz23tYo7wDmpqQ0 +CUZ2YNeUSWvLAoIBAQCLxczm7uBEbNT5iUcm+ALA3Mb6c2YwWUP3kpVia9+sItAM +0n/XTpioYMFJRY9aBCAZczWl+1uwRNElZPk2WWkl1cqIokvcNpeKhMzNRENtgClZ +yjFU5AjeJcwIWFVHUm670zJPF+NrCzOey8CWgWidmjk/tioxLFAYM6J2W7QJmqdk +fW8vq3TdQyRMPd+5SARb3NTjC3MgYW1vlmK9nCFFf3+5VubhaUY+RBA/5zDnubL6 +Bip8IGoloIJ95ZvE6Mkd9mBrE8uiEU2CbQWgsw7I6JTng58Fbb8n9BJAOt+hvW7H +AKbC8eB7M1mffcKNhtuxkdurcE4q7/ax21EkBaw9AoIBAHdA/svIxyWH5GQMkG5X +pmovupsgMmZngTCn56f4wNsjWib50B2vyK3UHzXn6Y040b8llEfduG4U/vhZeyoB +yqlt46fAonAxOphG0D1c2LeEn9EbQDfwue4yGM1zzfxOrq3qvHz05IpBqKNiueOS +wu5oVyiP8O53X327R5ETWYFnEhjJrTH0y+mJ/dy/kLcRExntuAY6wXWYk1tfDXg1 +KNd0Z/BSTO12IMjY+vxGKRwWbVdN+jtGbxCA1E438//e94yLRic0f1KHhsL/S9hq +mpMsTt1bXx8NtxAOxBBdf5cVkS8eq3mCQmYnw4SGmCcEGfz5L8Gwb/6b0D379w4v +/MkCggEAEwIOqhgwUjscb1L9NTehFa3CaK8eQwkEvB7aQLhTbTUK4bigSd6f20Ju +3N/zmHhSkEm6Oq9+zrjm8QqgztrB3Zn+nCWu2n7BHFoCexg4Qov3xuKsC3MMh/oa +/xqgVgsFOgqk93YWv/6IvCHM8F3Ntk+rcUlt0M1VxXC8Ym/vRHy5VN12asI19K1M +UbZINK0bVAdEdwaB2OSx14yag9uiaXxKryZe7qTjcR2ELxksVo0nwX7h0NHA1S77 +Ubq57jxR5a/wI9VA5iTUZSUkjxfEdPUEAY1eDdp+Ogqcs1Zsa8Er0JUNPxqJzmlJ +Q2ypwp0CapUz0d43a47/izrjAtXzug== +-----END PRIVATE KEY----- diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index 39d97831..09907411 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -9,6 +9,9 @@ import dask +from distributed.security import Security +from distributed import Client + from dask_jobqueue import ( JobQueueCluster, PBSCluster, @@ -393,3 +396,49 @@ def test_import_scheduler_options_from_config(Cluster): scheduler_options = cluster.scheduler_spec["options"] assert scheduler_options.get("interface") == pass_scheduler_interface assert scheduler_options.get("port") is None + + +@pytest.mark.parametrize("Cluster", all_clusters) +def test_wrong_parameter_error(Cluster): + match = re.compile( + "unexpected keyword argument.+wrong_parameter.+" + "{}.+job_kwargs.+cores.+memory.+" + "wrong_parameter.+wrong_parameter_value".format(Cluster.__name__), + re.DOTALL, + ) + with pytest.raises(ValueError, match=match): + create_cluster_func( + Cluster, cores=1, memory="1GB", wrong_parameter="wrong_parameter_value", + ) + + +def test_security(): + dirname = os.path.dirname(__file__) + key = os.path.join(dirname, "key.pem") + cert = os.path.join(dirname, "ca.pem") + security = Security( + tls_ca_file=cert, + tls_scheduler_key=key, + tls_scheduler_cert=cert, + tls_worker_key=key, + tls_worker_cert=cert, + tls_client_key=key, + tls_client_cert=cert, + require_encryption=True, + ) + + with LocalCluster( + cores=1, memory="1GB", security=security, protocol="tls" + ) as cluster: + assert cluster.security == security + assert cluster.scheduler_spec["options"]["security"] == security + job_script = cluster.job_script() + assert "--tls-key {}".format(key) in job_script + assert "--tls-cert {}".format(cert) in job_script + assert "--tls-ca-file {}".format(cert) in job_script + + cluster.scale(jobs=1) + with Client(cluster, security=security) as client: + future = client.submit(lambda x: x + 1, 10) + result = future.result() + assert result == 11 diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 5035852b..830b660e 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -19,8 +19,6 @@ def test_header(): processes=4, cores=8, memory="28GB", - job_cpu=16, - job_mem="100G", job_extra=["-t besteffort"], ) as cluster: assert "walltime=" in cluster.job_header diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index c017867b..99ad1e16 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -17,6 +17,8 @@ Development version ``threads_per_process=cores``. (:pr:`375`) - all cluster classes: ``interface`` was ignored when set in a config file. (:pr:`366`) +- all cluster classes: fix a bug that would allow to pass any named parameter without an error (:pr:`398`) +- all cluster classes: fix a bug where ``security`` was not correctly passed through (:pr:`398`) - ``LSFCluster``: switch to ``use_stdin=True`` by default (:pr:`388`). - ``LSFCluster``: add ``use_stdin`` to ``LSFCluster``. This switches between ``bsub < job_script`` and ``bsub job_script`` to launch a ``LSF`` job