Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import sys
import warnings
import socket
from collections import OrderedDict
from contextlib import contextmanager

Expand All @@ -14,7 +15,7 @@
from distributed import LocalCluster
from distributed.deploy import Cluster
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.utils import (format_bytes, parse_bytes, tmpfile)
from distributed.utils import (format_bytes, parse_bytes, tmpfile, get_ip_interface)

logger = logging.getLogger(__name__)
docstrings = docrep.DocstringProcessor()
Expand Down Expand Up @@ -179,8 +180,6 @@ def __init__(self,
local_directory = dask.config.get('jobqueue.%s.local-directory' % self.scheduler_name)
if extra is None:
extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name)
if interface:
extra += ' --interface %s ' % interface
if env_extra is None:
env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name)

Expand All @@ -196,9 +195,18 @@ def __init__(self,
# This attribute should be overriden
self.job_header = None

# Bind to all network addresses by default
scheduler_address = socket.gethostname()
if 'ip' not in kwargs:
# Bind to all network addresses by default
kwargs['ip'] = ''
else:
scheduler_address = kwargs['ip']
if interface:
extra += ' --interface %s ' % interface
# For correct interface use with defaults binding to all network
# it is needed to choose the correct scheduler address, probably
# not default one.
scheduler_address = get_ip_interface(interface)

self.local_cluster = LocalCluster(n_workers=0, **kwargs)

Expand All @@ -219,7 +227,8 @@ def __init__(self,

# dask-worker command line build
dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable)
self._command_template = ' '.join([dask_worker_command, self.scheduler.address])
scheduler_address = 'tcp://%s:%d' % (scheduler_address, self.scheduler.port)
self._command_template = ' '.join([dask_worker_command, scheduler_address])
self._command_template += " --nthreads %d" % self.worker_threads
if processes is not None and processes > 1:
self._command_template += " --nprocs %d" % processes
Expand Down