Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dependencies:
- black
- pytest
- pytest-asyncio
- cryptography
Comment thread
guillaumeeb marked this conversation as resolved.
2 changes: 1 addition & 1 deletion ci/htcondor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function jobqueue_before_install {

function jobqueue_install {
cd ./ci/htcondor
docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .;chown -R submituser ."
docker-compose exec -T submit /bin/bash -c "cd /dask-jobqueue; pip3 install -e .[test]; chown -R submituser ."
Comment thread
guillaumeeb marked this conversation as resolved.
cd -
}

Expand Down
10 changes: 6 additions & 4 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ def pytest_addoption(parser):
def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "env(name): mark test to run only on named environment"
"markers", "env(NAME): only run test if environment NAME matches"
Comment thread
guillaumeeb marked this conversation as resolved.
)


def pytest_runtest_setup(item):
envnames = [mark.args[0] for mark in item.iter_markers(name="env")]
if envnames:
if item.config.getoption("-E") not in envnames:
pytest.skip("test requires env in %r" % envnames)
if (item.config.getoption("-E") is None and envnames) or (
Comment thread
guillaumeeb marked this conversation as resolved.
item.config.getoption("-E") is not None
and item.config.getoption("-E") not in envnames
):
pytest.skip("test requires env in %r" % envnames)


@pytest.fixture(autouse=True)
Expand Down
38 changes: 36 additions & 2 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import sys
import weakref
import abc
import tempfile
import copy

import dask

Expand All @@ -17,6 +19,7 @@
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.local import nprocesses_nthreads
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.utils import tmpfile

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -220,6 +223,7 @@ def __init__(
extra = extra + ["--protocol", protocol]
if security:
worker_security_dict = security.get_tls_config_for_role("worker")

security_command_line_list = [
["--tls-" + key.replace("_", "-"), value]
for key, value in worker_security_dict.items()
Expand Down Expand Up @@ -450,7 +454,7 @@ def __init__(
scheduler_cls=Scheduler, # Use local scheduler for now
# Options for both scheduler and workers
interface=None,
protocol="tcp://",
protocol=None,
Comment thread
guillaumeeb marked this conversation as resolved.
# Job keywords
config_name=None,
**job_kwargs
Expand Down Expand Up @@ -500,6 +504,17 @@ def __init__(
"jobqueue.%s.scheduler-options" % config_name, {}
)

if protocol is None and security is not None:
protocol = "tls://"
if security is None and protocol is not None and protocol.startswith("tls"):
try:
security = Security.temporary()
except ImportError:
Comment thread
guillaumeeb marked this conversation as resolved.
raise ImportError(
"In order to use TLS without pregenerated certificates `cryptography` is required,"
"please install it using either pip or conda"
)

default_scheduler_options = {
"protocol": protocol,
"dashboard_address": ":8787",
Expand All @@ -521,7 +536,26 @@ def __init__(
job_kwargs["config_name"] = config_name
job_kwargs["interface"] = interface
job_kwargs["protocol"] = protocol
job_kwargs["security"] = security
job_kwargs["security"] = copy.copy(security)
Comment thread
guillaumeeb marked this conversation as resolved.

if security is not None:
worker_security_dict = job_kwargs["security"].get_tls_config_for_role(
"worker"
)
for key, value in worker_security_dict.items():
# dump worker in-memory keys for use in job_script
if value is not None and "\n" in value:
f = tempfile.NamedTemporaryFile(mode="wt")
# make sure that tmpfile survives by keeping a reference
setattr(self, "_job_" + key, f)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is ugly, but I having to clean up manually seemed more ugly. Ideas are welcome

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what must be done here? Maybe @jacobtomlinson understands better.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am setting a reference to keep to the temp file from being deconstructed (which triggers its removal)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's about the problem underlined in #520 again?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you at least do this part in a separated function?

And just to be sure, does this creates a file in /tmp directory (which would not been shared with workers)? Or is this in the job execution folder? I guess it needs to be created in a shared folder?

f.write(value)
f.flush()
setattr(
job_kwargs["security"],
"tls_" + ("worker_" if key != "ca_file" else "") + key,
f.name,
)

self._job_kwargs = job_kwargs

worker = {"cls": self.job_cls, "options": self._job_kwargs}
Expand Down
22 changes: 19 additions & 3 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,11 @@ def test_security():
require_encryption=True,
)

with LocalCluster(
cores=1, memory="1GB", security=security, protocol="tls"
) as cluster:
Comment thread
guillaumeeb marked this conversation as resolved.
with LocalCluster(cores=1, memory="1GB", security=security) as cluster:
assert cluster.security == security
assert cluster.scheduler_spec["options"]["security"] == security
job_script = cluster.job_script()
assert "tls://" in job_script
assert "--tls-key {}".format(key) in job_script
assert "--tls-cert {}".format(cert) in job_script
assert "--tls-ca-file {}".format(cert) in job_script
Expand All @@ -442,3 +441,20 @@ def test_security():
future = client.submit(lambda x: x + 1, 10)
result = future.result()
assert result == 11


def test_security_temporary():
with LocalCluster(cores=1, memory="1GB", protocol="tls") as cluster:
assert cluster.security
assert cluster.scheduler_spec["options"]["security"] == cluster.security
job_script = cluster.job_script()
assert "tls://" in job_script
assert "--tls-key" in job_script
assert "--tls-cert" in job_script
assert "--tls-ca-file" in job_script

cluster.scale(jobs=1)
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
result = future.result()
assert result == 11
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
packages=["dask_jobqueue"],
include_package_data=True,
install_requires=install_requires,
tests_require=["pytest >= 2.7.1"],
Comment thread
guillaumeeb marked this conversation as resolved.
extras_require={"test": ["pytest >= 2.7.1", "pytest-asyncio", "cryptography"]},
long_description=long_description,
zip_safe=False,
)