From 992d741f6da883bf05f4b9d9acb9e2401abb37c4 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Thu, 16 Aug 2018 16:14:57 +0200 Subject: [PATCH] overriding scheduler address --- dask_jobqueue/core.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f0f6b852..253ef752 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -6,6 +6,7 @@ import subprocess import sys import warnings +import socket from collections import OrderedDict from contextlib import contextmanager @@ -14,7 +15,7 @@ from distributed import LocalCluster from distributed.deploy import Cluster from distributed.diagnostics.plugin import SchedulerPlugin -from distributed.utils import (format_bytes, parse_bytes, tmpfile) +from distributed.utils import (format_bytes, parse_bytes, tmpfile, get_ip_interface) logger = logging.getLogger(__name__) docstrings = docrep.DocstringProcessor() @@ -179,8 +180,6 @@ def __init__(self, local_directory = dask.config.get('jobqueue.%s.local-directory' % self.scheduler_name) if extra is None: extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name) - if interface: - extra += ' --interface %s ' % interface if env_extra is None: env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name) @@ -196,9 +195,18 @@ def __init__(self, # This attribute should be overriden self.job_header = None - # Bind to all network addresses by default + scheduler_address = socket.gethostname() if 'ip' not in kwargs: + # Bind to all network addresses by default kwargs['ip'] = '' + else: + scheduler_address = kwargs['ip'] + if interface: + extra += ' --interface %s ' % interface + # For correct interface use with defaults binding to all network + # it is needed to choose the correct scheduler address, probably + # not default one. + scheduler_address = get_ip_interface(interface) self.local_cluster = LocalCluster(n_workers=0, **kwargs) @@ -219,7 +227,8 @@ def __init__(self, # dask-worker command line build dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable) - self._command_template = ' '.join([dask_worker_command, self.scheduler.address]) + scheduler_address = 'tcp://%s:%d' % (scheduler_address, self.scheduler.port) + self._command_template = ' '.join([dask_worker_command, scheduler_address]) self._command_template += " --nthreads %d" % self.worker_threads if processes is not None and processes > 1: self._command_template += " --nprocs %d" % processes