diff --git a/ci/slurm.sh b/ci/slurm.sh index 065bfa02..b856858c 100644 --- a/ci/slurm.sh +++ b/ci/slurm.sh @@ -11,6 +11,16 @@ function jobqueue_before_install { docker ps -a docker images + show_network_interfaces +} + +function show_network_interfaces { + for c in slurmctld c1 c2; do + echo '------------------------------------------------------------' + echo docker container: $c + docker exec -it $c python -c 'import psutil; print(psutil.net_if_addrs().keys())' + echo '------------------------------------------------------------' + done } function jobqueue_install { diff --git a/ci/slurm/Dockerfile b/ci/slurm/Dockerfile index 7e8dfd1d..5120e310 100644 --- a/ci/slurm/Dockerfile +++ b/ci/slurm/Dockerfile @@ -1,5 +1,7 @@ FROM giovtorres/slurm-docker-cluster +RUN yum install -y iproute + RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh && \ bash miniconda.sh -f -b -p /opt/anaconda && \ /opt/anaconda/bin/conda clean -tipy && \ diff --git a/ci/slurm/docker-compose.yml b/ci/slurm/docker-compose.yml index 414ebfea..4764533f 100644 --- a/ci/slurm/docker-compose.yml +++ b/ci/slurm/docker-compose.yml @@ -12,6 +12,8 @@ services: MYSQL_PASSWORD: password volumes: - var_lib_mysql:/var/lib/mysql + networks: + common-network: slurmdbd: build: . @@ -26,6 +28,8 @@ services: - "6819" depends_on: - mysql + networks: + common-network: slurmctld: build: . @@ -42,6 +46,11 @@ services: - "6817" depends_on: - "slurmdbd" + networks: + common-network: + ipv4_address: 10.1.1.10 + cap_add: + - NET_ADMIN c1: build: . @@ -57,6 +66,11 @@ services: - "6818" depends_on: - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.11 + cap_add: + - NET_ADMIN c2: build: . @@ -72,6 +86,11 @@ services: - "6818" depends_on: - "slurmctld" + networks: + common-network: + ipv4_address: 10.1.1.12 + cap_add: + - NET_ADMIN volumes: etc_munge: @@ -79,3 +98,11 @@ volumes: slurm_jobdir: var_lib_mysql: var_log_slurm: + +networks: + common-network: + driver: bridge + ipam: + driver: default + config: + - subnet: 10.1.1.0/24 diff --git a/ci/slurm/start-slurm.sh b/ci/slurm/start-slurm.sh index 5ba34471..b369f3aa 100755 --- a/ci/slurm/start-slurm.sh +++ b/ci/slurm/start-slurm.sh @@ -1,9 +1,17 @@ #!/bin/bash docker-compose up --build -d + while [ `./register_cluster.sh 2>&1 | grep "sacctmgr: error" | wc -l` -ne 0 ] do echo "Waiting for SLURM cluster to become ready"; sleep 2 done echo "SLURM properly configured" + +# On some clusters the login node does not have the same interface as the +# compute nodes. The next three lines allow to test this edge case by adding +# separate interfaces on the worker and on the scheduler nodes. +docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler +docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker +docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index e6bc55d8..994bc886 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -15,7 +15,7 @@ from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.local import nprocesses_nthreads from distributed.scheduler import Scheduler -from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface +from distributed.utils import format_bytes, parse_bytes, tmpfile logger = logging.getLogger(__name__) @@ -30,7 +30,11 @@ By default, ``process ~= sqrt(cores)`` so that the number of processes and the number of threads per process is roughly the same. interface : str - Network interface like 'eth0' or 'ib0'. + Network interface like 'eth0' or 'ib0'. This will be used both for the + Dask scheduler and the Dask workers interface. If you need a different + interface for the Dask scheduler you can pass it through + the ``scheduler_options`` argument: + ``interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}``. nanny : bool Whether or not to start a nanny process local_directory : str @@ -69,8 +73,12 @@ Whether or not to run this cluster object with the async/await syntax security : Security A dask.distributed security object if you're using TLS/SSL - dashboard_address : str or int - An address like ":8787" on which to host the Scheduler's dashboard + scheduler_options : dict + Used to pass additional arguments to Dask Scheduler. For example use + ``scheduler_options={'dasboard_address': ':12435'}`` to specify which + port the web dashboard should use or ``scheduler_options={'host': 'your-host'}`` + to specify the host the Dask scheduler should run on. See + :class:`distributed.Scheduler` for more details. """.strip() @@ -202,9 +210,6 @@ def __init__( if interface: extra = extra + ["--interface", interface] - kwargs.setdefault("host", get_ip_interface(interface)) - else: - kwargs.setdefault("host", "") # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(memory) if memory is not None else None @@ -420,13 +425,15 @@ def __init__( silence_logs="error", name=None, asynchronous=False, - # Scheduler keywords - interface=None, + # Scheduler-only keywords + dashboard_address=None, host=None, + scheduler_options=None, + # Options for both scheduler and workers + interface=None, protocol="tcp://", - dashboard_address=":8787", - config_name=None, # Job keywords + config_name=None, **kwargs ): self.status = "created" @@ -447,22 +454,47 @@ def __init__( ) ) + if dashboard_address is not None: + raise ValueError( + "Please pass 'dashboard_address' through 'scheduler_options': use\n" + 'cluster = {0}(..., scheduler_options={{"dashboard_address": ":12345"}}) rather than\n' + 'cluster = {0}(..., dashboard_address="12435")'.format( + self.__class__.__name__ + ) + ) + + if host is not None: + raise ValueError( + "Please pass 'host' through 'scheduler_options': use\n" + 'cluster = {0}(..., scheduler_options={{"host": "your-host"}}) rather than\n' + 'cluster = {0}(..., host="your-host")'.format(self.__class__.__name__) + ) + default_config_name = self.job_cls.default_config_name() if config_name is None: config_name = default_config_name if interface is None: interface = dask.config.get("jobqueue.%s.interface" % config_name) + if scheduler_options is None: + scheduler_options = {} + + default_scheduler_options = { + "protocol": protocol, + "dashboard_address": ":8787", + "security": security, + } + # scheduler_options overrides parameters common to both workers and scheduler + scheduler_options = dict(default_scheduler_options, **scheduler_options) + + # Use the same network interface as the workers if scheduler ip has not + # been set through scheduler_options via 'host' or 'interface' + if "host" not in scheduler_options and "interface" not in scheduler_options: + scheduler_options["interface"] = interface scheduler = { "cls": Scheduler, # Use local scheduler for now - "options": { - "protocol": protocol, - "interface": interface, - "host": host, - "dashboard_address": dashboard_address, - "security": security, - }, + "options": scheduler_options, } kwargs["config_name"] = config_name diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index ebbf0c1e..bc9c94e0 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -84,7 +84,7 @@ def test_forward_ip(): cores=8, memory="28GB", name="dask-worker", - host=ip, + scheduler_options={"host": ip}, ) as cluster: assert cluster.scheduler.ip == ip @@ -268,3 +268,78 @@ def test_default_number_of_worker_processes(Cluster): with Cluster(cores=6, memory="4GB") as cluster: assert " --nprocs 3" in cluster.job_script() assert " --nthreads 2" in cluster.job_script() + + +@pytest.mark.parametrize( + "Cluster", + [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], +) +def test_scheduler_options(Cluster): + net_if_addrs = psutil.net_if_addrs() + interface = list(net_if_addrs.keys())[0] + port = 8804 + + with Cluster( + cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port} + ) as cluster: + scheduler_options = cluster.scheduler_spec["options"] + assert scheduler_options["interface"] == interface + assert scheduler_options["port"] == port + + +@pytest.mark.parametrize( + "Cluster", + [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], +) +def test_scheduler_options_interface(Cluster): + net_if_addrs = psutil.net_if_addrs() + scheduler_interface = list(net_if_addrs.keys())[0] + worker_interface = "worker-interface" + scheduler_host = socket.gethostname() + + with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster: + scheduler_options = cluster.scheduler_spec["options"] + worker_options = cluster.new_spec["options"] + assert scheduler_options["interface"] == scheduler_interface + assert worker_options["interface"] == scheduler_interface + + with Cluster( + cores=1, + memory="1GB", + interface=worker_interface, + scheduler_options={"interface": scheduler_interface}, + ) as cluster: + scheduler_options = cluster.scheduler_spec["options"] + worker_options = cluster.new_spec["options"] + assert scheduler_options["interface"] == scheduler_interface + assert worker_options["interface"] == worker_interface + + with Cluster( + cores=1, + memory="1GB", + interface=worker_interface, + scheduler_options={"host": scheduler_host}, + ) as cluster: + scheduler_options = cluster.scheduler_spec["options"] + assert scheduler_options.get("interface") is None + assert scheduler_options["host"] == scheduler_host + assert worker_options["interface"] == worker_interface + + +@pytest.mark.parametrize( + "Cluster", + [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], +) +def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster): + scheduler_host = socket.gethostname() + message_template = "pass {!r} through 'scheduler_options'" + + message = message_template.format("host") + with pytest.raises(ValueError, match=message): + with Cluster(cores=1, memory="1GB", host=scheduler_host): + pass + + message = message_template.format("dashboard_address") + with pytest.raises(ValueError, match=message): + with Cluster(cores=1, memory="1GB", dashboard_address=":8787"): + pass diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 6b26ffc0..dec9fd97 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -193,3 +193,22 @@ def test_config_name_slurm_takes_custom_config(): with dask.config.set({"jobqueue.slurm-config-name": conf}): with SLURMCluster(config_name="slurm-config-name") as cluster: assert cluster.job_name == "myname" + + +@pytest.mark.env("slurm") +def test_different_interfaces_on_scheduler_and_workers(loop): + with SLURMCluster( + walltime="00:02:00", + cores=1, + memory="2GB", + interface="eth0:worker", + scheduler_options={"interface": "eth0:scheduler"}, + loop=loop, + ) as cluster: + cluster.scale(jobs=1) + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + + client.wait_for_workers(1) + + assert future.result(QUEUE_WAIT) == 11 diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 264947a2..d4ce3e43 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,6 +4,11 @@ Changelog Development version ------------------- +- all cluster classes: add ``scheduler_options`` allows to pass parameters to + the Dask scheduler. For example ``scheduler_options={'interface': 'eth0', + dashboard_addresses=':12435')`` (:pr:`384`). Breaking change: using ``port`` + or ``dashboard_addresses`` arguments raises an error. They have to be passed + through ``scheduler_options``. - all cluster classes: ``processes`` parameter default has changed. By default, ``processes ~= sqrt(cores)`` so that the number of processes and the number of threads per process is roughly the same. Old default was to use one diff --git a/docs/source/interactive.rst b/docs/source/interactive.rst index 4be452c3..72789853 100644 --- a/docs/source/interactive.rst +++ b/docs/source/interactive.rst @@ -129,10 +129,12 @@ dashboard is valuable to help you understand the state of your computation and cluster. Typically, the dashboard is served on a separate port from Jupyter, and so can -be used whether you choose to use Jupyter or not. If you want to open up a +be used whether you choose to use Jupyter or not. If you want to open up a connection to see the dashboard you can do so with SSH Tunneling as described -above. The dashboard's default port is at ``8787``, and is configurable with -the ``dashboard_address=`` keyword to the Dask Jobqueue cluster objects. +above. The dashboard's default port is at ``8787``, and is configurable by +using the ``scheduler_options`` parameter in the Dask Jobqueue cluster object. +For example ``scheduler_options={'dashboard_address': ':12435'}`` would use +12435 for the web dasboard port. However, Jupyter is also able to proxy the dashboard connection through the Jupyter server, allowing you to access the dashboard at