Skip to content
10 changes: 10 additions & 0 deletions ci/slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ function jobqueue_before_install {

docker ps -a
docker images
show_network_interfaces
}

function show_network_interfaces {
for c in slurmctld c1 c2; do
echo '------------------------------------------------------------'
echo docker container: $c
docker exec -it $c python -c 'import psutil; print(psutil.net_if_addrs().keys())'
echo '------------------------------------------------------------'
done
}

function jobqueue_install {
Expand Down
2 changes: 2 additions & 0 deletions ci/slurm/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM giovtorres/slurm-docker-cluster

RUN yum install -y iproute

RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
bash miniconda.sh -f -b -p /opt/anaconda && \
/opt/anaconda/bin/conda clean -tipy && \
Expand Down
27 changes: 27 additions & 0 deletions ci/slurm/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
MYSQL_PASSWORD: password
volumes:
- var_lib_mysql:/var/lib/mysql
networks:
common-network:

slurmdbd:
build: .
Expand All @@ -26,6 +28,8 @@ services:
- "6819"
depends_on:
- mysql
networks:
common-network:

slurmctld:
build: .
Expand All @@ -42,6 +46,11 @@ services:
- "6817"
depends_on:
- "slurmdbd"
networks:
common-network:
ipv4_address: 10.1.1.10
cap_add:
- NET_ADMIN

c1:
build: .
Expand All @@ -57,6 +66,11 @@ services:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.11
cap_add:
- NET_ADMIN

c2:
build: .
Expand All @@ -72,10 +86,23 @@ services:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.12
cap_add:
- NET_ADMIN

volumes:
etc_munge:
etc_slurm:
slurm_jobdir:
var_lib_mysql:
var_log_slurm:

networks:
common-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 10.1.1.0/24
8 changes: 8 additions & 0 deletions ci/slurm/start-slurm.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
#!/bin/bash

docker-compose up --build -d

while [ `./register_cluster.sh 2>&1 | grep "sacctmgr: error" | wc -l` -ne 0 ]
do
echo "Waiting for SLURM cluster to become ready";
sleep 2
done
echo "SLURM properly configured"

# On some clusters the login node does not have the same interface as the
# compute nodes. The next three lines allow to test this edge case by adding
# separate interfaces on the worker and on the scheduler nodes.
docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler
docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker
docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker
68 changes: 50 additions & 18 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.local import nprocesses_nthreads
from distributed.scheduler import Scheduler
from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface
from distributed.utils import format_bytes, parse_bytes, tmpfile

logger = logging.getLogger(__name__)

Expand All @@ -30,7 +30,11 @@
By default, ``process ~= sqrt(cores)`` so that the number of processes
and the number of threads per process is roughly the same.
interface : str
Network interface like 'eth0' or 'ib0'.
Network interface like 'eth0' or 'ib0'. This will be used both for the
Dask scheduler and the Dask workers interface. If you need a different
interface for the Dask scheduler you can pass it through
the ``scheduler_options`` argument:
``interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}``.
nanny : bool
Whether or not to start a nanny process
local_directory : str
Expand Down Expand Up @@ -69,8 +73,12 @@
Whether or not to run this cluster object with the async/await syntax
security : Security
A dask.distributed security object if you're using TLS/SSL
dashboard_address : str or int
An address like ":8787" on which to host the Scheduler's dashboard
scheduler_options : dict
Used to pass additional arguments to Dask Scheduler. For example use
``scheduler_options={'dasboard_address': ':12435'}`` to specify which
port the web dashboard should use or ``scheduler_options={'host': 'your-host'}``
to specify the host the Dask scheduler should run on. See
:class:`distributed.Scheduler` for more details.
""".strip()


Expand Down Expand Up @@ -202,9 +210,6 @@ def __init__(

if interface:
extra = extra + ["--interface", interface]
kwargs.setdefault("host", get_ip_interface(interface))

@lesteve lesteve Mar 16, 2020

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After having looked a bit more in details, I am reasonably sure that host in kwargs is not used at all, so this code can be safely removed. In other words this is another manifestation of #386.

This code was introduced in #135. At that time the kwargs were passed to LocalCluster. Nowadays the kwargs are passed to the job_cls. This does not make any sense to pass host=something to the worker arguments where something is computed on the node the scheduler runs (similar comment as #382 (comment)) ...

else:
kwargs.setdefault("host", "")

# Keep information on process, cores, and memory, for use in subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
Expand Down Expand Up @@ -420,13 +425,15 @@ def __init__(
silence_logs="error",
name=None,
asynchronous=False,
# Scheduler keywords
interface=None,
# Scheduler-only keywords
dashboard_address=None,
host=None,
scheduler_options=None,
# Options for both scheduler and workers
interface=None,
protocol="tcp://",
dashboard_address=":8787",
config_name=None,
# Job keywords
config_name=None,
**kwargs
):
self.status = "created"
Expand All @@ -447,22 +454,47 @@ def __init__(
)
)

if dashboard_address is not None:

@lesteve lesteve Mar 4, 2020

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not allowing to pass host and dashboard_address is a breaking change. I guess it feels cleaner to be able to pass all the scheduler options through scheduler_options.

Also if we keep host for example, you can pass different things at different levels, e.g. host + interface in scheduler_options which would make the error hard to make user-friendly (and/or the code a bit ugly).

If you think that's too harsh, I can make it a warning until the 0.8 release!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What feel's weird here is to keep the host and dashboard_address in the constructor kwargs, but thrown an exception when used. Maybe we should add some doc or comment, or simply remove them and catch them through **kwargs argument? Or else we should handle it and log a deprecation warning.

I'm not really familliar with Python best practices in this case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see two solutions:

  1. raise an informative error if people are using host or dasboard_address to tell them to pass this through scheduler_options. What I had in mind was to have this user-friendly error in 0.7.1 and remove host and dashboard_address in 0.8.
  2. warn if people are using host or dashboard_address telling them that this will not be supported in 0.8 and then combine them with scheduler_options to do the right thing.

I tend to think 1. is easier because you don't need the combining logic between host dashboard_address and scheduler_options which can be tricky. For example, if you use host and use scheduler_options={'interface': eth0} which one should have the priority and should we raise an error in this case.

Also my feeling is that people ignore warnings until it breaks so that 2. is not that helpful for users in the end.

I'm not really familliar with Python best practices in this case.

It completely depends on the project. For example, in scikit-learn we have a strong backward-compatibility policy: we would have warnings for two major releases before we can break user code. I feel in dask-jobqueue we can afford to be less conservative.

raise ValueError(
"Please pass 'dashboard_address' through 'scheduler_options': use\n"
'cluster = {0}(..., scheduler_options={{"dashboard_address": ":12345"}}) rather than\n'
'cluster = {0}(..., dashboard_address="12435")'.format(
self.__class__.__name__
)
)

if host is not None:
raise ValueError(
"Please pass 'host' through 'scheduler_options': use\n"
'cluster = {0}(..., scheduler_options={{"host": "your-host"}}) rather than\n'
'cluster = {0}(..., host="your-host")'.format(self.__class__.__name__)
)

default_config_name = self.job_cls.default_config_name()
if config_name is None:
config_name = default_config_name

if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
if scheduler_options is None:
scheduler_options = {}

default_scheduler_options = {
"protocol": protocol,
"dashboard_address": ":8787",
"security": security,
}
# scheduler_options overrides parameters common to both workers and scheduler
scheduler_options = dict(default_scheduler_options, **scheduler_options)

# Use the same network interface as the workers if scheduler ip has not
# been set through scheduler_options via 'host' or 'interface'
if "host" not in scheduler_options and "interface" not in scheduler_options:
scheduler_options["interface"] = interface

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should'nt we just put interface in the default_scheduler_options? Won't it be overriden if defined in the scheduler_options?

Or is this because of a problem in distributed if host and interface do not match?

@lesteve lesteve Mar 8, 2020

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably test this edge case indeed. My line of reasoning is like this (as I tried to explain in the comment but the wording can probably be improved ...): interface parameter applies primarily to workers. It should apply to the scheduler only if the scheduler IP is not already set by either host or interface in the scheduler_options dict.


scheduler = {
"cls": Scheduler, # Use local scheduler for now
"options": {
"protocol": protocol,
"interface": interface,
"host": host,
"dashboard_address": dashboard_address,
"security": security,
},
"options": scheduler_options,
}

kwargs["config_name"] = config_name
Expand Down
77 changes: 76 additions & 1 deletion dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_forward_ip():
cores=8,
memory="28GB",
name="dask-worker",
host=ip,
scheduler_options={"host": ip},
) as cluster:
assert cluster.scheduler.ip == ip

Expand Down Expand Up @@ -268,3 +268,78 @@ def test_default_number_of_worker_processes(Cluster):
with Cluster(cores=6, memory="4GB") as cluster:
assert " --nprocs 3" in cluster.job_script()
assert " --nthreads 2" in cluster.job_script()


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_scheduler_options(Cluster):
net_if_addrs = psutil.net_if_addrs()
interface = list(net_if_addrs.keys())[0]
port = 8804

with Cluster(
cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options["interface"] == interface
assert scheduler_options["port"] == port


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_scheduler_options_interface(Cluster):
net_if_addrs = psutil.net_if_addrs()
scheduler_interface = list(net_if_addrs.keys())[0]
worker_interface = "worker-interface"
scheduler_host = socket.gethostname()

with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == scheduler_interface

with Cluster(
cores=1,
memory="1GB",
interface=worker_interface,
scheduler_options={"interface": scheduler_interface},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == worker_interface

with Cluster(
cores=1,
memory="1GB",
interface=worker_interface,
scheduler_options={"host": scheduler_host},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") is None
assert scheduler_options["host"] == scheduler_host
assert worker_options["interface"] == worker_interface


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster):
scheduler_host = socket.gethostname()
message_template = "pass {!r} through 'scheduler_options'"

message = message_template.format("host")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", host=scheduler_host):
pass

message = message_template.format("dashboard_address")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", dashboard_address=":8787"):
pass
19 changes: 19 additions & 0 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,22 @@ def test_config_name_slurm_takes_custom_config():
with dask.config.set({"jobqueue.slurm-config-name": conf}):
with SLURMCluster(config_name="slurm-config-name") as cluster:
assert cluster.job_name == "myname"


@pytest.mark.env("slurm")
def test_different_interfaces_on_scheduler_and_workers(loop):
with SLURMCluster(
walltime="00:02:00",
cores=1,
memory="2GB",
interface="eth0:worker",
scheduler_options={"interface": "eth0:scheduler"},
loop=loop,
) as cluster:
cluster.scale(jobs=1)
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)

client.wait_for_workers(1)

assert future.result(QUEUE_WAIT) == 11
5 changes: 5 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Changelog
Development version
-------------------

- all cluster classes: add ``scheduler_options`` allows to pass parameters to
the Dask scheduler. For example ``scheduler_options={'interface': 'eth0',
dashboard_addresses=':12435')`` (:pr:`384`). Breaking change: using ``port``
or ``dashboard_addresses`` arguments raises an error. They have to be passed
through ``scheduler_options``.
- all cluster classes: ``processes`` parameter default has changed. By default,
``processes ~= sqrt(cores)`` so that the number of processes and the number
of threads per process is roughly the same. Old default was to use one
Expand Down
8 changes: 5 additions & 3 deletions docs/source/interactive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ dashboard is valuable to help you understand the state of your computation and
cluster.

Typically, the dashboard is served on a separate port from Jupyter, and so can
be used whether you choose to use Jupyter or not. If you want to open up a
be used whether you choose to use Jupyter or not. If you want to open up a
connection to see the dashboard you can do so with SSH Tunneling as described
above. The dashboard's default port is at ``8787``, and is configurable with
the ``dashboard_address=`` keyword to the Dask Jobqueue cluster objects.
above. The dashboard's default port is at ``8787``, and is configurable by
using the ``scheduler_options`` parameter in the Dask Jobqueue cluster object.
For example ``scheduler_options={'dashboard_address': ':12435'}`` would use
12435 for the web dasboard port.

However, Jupyter is also able to proxy the dashboard connection through the
Jupyter server, allowing you to access the dashboard at
Expand Down