Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import print_function, division, absolute_import

from .config import config
from . import config
from dask.config import config
from .core import connect, rpc
from .deploy import LocalCluster, Adaptive
from .diagnostics import progress
Expand Down
4 changes: 2 additions & 2 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from collections import deque
import logging

import dask
from tornado import gen, locks
from tornado.ioloop import IOLoop

from .config import config
from .core import CommClosedError
from .utils import parse_timedelta

Expand Down Expand Up @@ -51,7 +51,7 @@ def __init__(self, interval, loop=None):
self.batch_count = 0
self.byte_count = 0
self.next_deadline = None
self.recent_message_log = deque(maxlen=config.get('recent-messages-log-length', 0))
self.recent_message_log = deque(maxlen=dask.config.get('distributed.comm.recent-messages-log-length'))

def start(self, comm):
self.comm = comm
Expand Down
6 changes: 3 additions & 3 deletions distributed/bokeh/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
Quad, TapTool, OpenURL, Button, Select)
from bokeh.palettes import Spectral9
from bokeh.plotting import figure
import dask
from tornado import gen

from ..config import config
from ..diagnostics.progress_stream import nbytes_bar
from .. import profile
from ..utils import log_errors, parse_timedelta

if config.get('bokeh-export-tool', False):
if dask.config.get('distributed.dashboard.export-tool'):
from .export_tool import ExportTool
else:
ExportTool = None


profile_interval = config.get('profile-interval', 10)
profile_interval = dask.config.get('distributed.worker.profile.interval')
profile_interval = parse_timedelta(profile_interval, default='ms')


Expand Down
15 changes: 7 additions & 8 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
scatter_to_workers, gather_from_workers)
from .cfexecutor import ClientExecutor
from .compatibility import Queue as pyQueue, Empty, isqueue, html_escape
from .config import config
from .core import connect, rpc, clean_exception, CommClosedError
from .metrics import time
from .node import Node
Expand Down Expand Up @@ -498,16 +497,16 @@ def __init__(self, address=None, loop=None, timeout=no_default,
name=None, heartbeat_interval=None,
serializers=None, deserializers=None, **kwargs):
if timeout == no_default:
timeout = config.get('connect-timeout', '10s')
timeout = dask.config.get('distributed.comm.timeouts.connect')
if timeout is not None:
timeout = parse_timedelta(timeout, 's')
self._timeout = timeout

self.futures = dict()
self.refcount = defaultdict(lambda: 0)
self.coroutines = []
if name is None and 'client-name' in config:
name = config['client-name']
if name is None:
name = dask.config.get('client-name', None)
self.id = type(self).__name__ + ('-' + name + '-' if name else '-') + str(uuid.uuid1(clock_seq=os.getpid()))
self.generation = 0
self.status = 'newly-created'
Expand Down Expand Up @@ -538,7 +537,7 @@ def __init__(self, address=None, loop=None, timeout=no_default,
self.loop = self._loop_runner.loop

if heartbeat_interval is None:
heartbeat_interval = config.get('client-heartbeat-interval', 5000)
heartbeat_interval = dask.config.get('distributed.client.heartbeat')
heartbeat_interval = parse_timedelta(heartbeat_interval, default='ms')

self._periodic_callbacks = dict()
Expand All @@ -551,8 +550,8 @@ def __init__(self, address=None, loop=None, timeout=no_default,
io_loop=self.loop
)

if address is None and 'scheduler-address' in config:
address = config['scheduler-address']
if address is None:
address = dask.config.get('scheduler-address', None)
if address:
logger.info("Config value `scheduler-address` found: %s",
address)
Expand Down Expand Up @@ -671,7 +670,7 @@ def _repr_html_(self):
host = 'localhost'
else:
host = rest.split(':')[0]
template = config.get('diagnostics-link', 'http://{host}:{port}/status')
template = dask.config.get('distributed.dashboard.link')
address = template.format(host=host, port=port, **os.environ)
text += " <li><b>Dashboard: </b><a href='%(web)s' target='_blank'>%(web)s</a>\n" % {'web': address}

Expand Down
5 changes: 3 additions & 2 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import six

from ..config import config
import dask

from . import registry


DEFAULT_SCHEME = config.get('default-scheme', 'tcp')
DEFAULT_SCHEME = dask.config.get('distributed.comm.default-scheme')


def parse_address(addr, strict=False):
Expand Down
5 changes: 2 additions & 3 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
from datetime import timedelta
import logging

import dask
from six import with_metaclass

from tornado import gen

from ..config import config
from ..metrics import time
from ..utils import parse_timedelta
from . import registry
Expand Down Expand Up @@ -161,7 +160,7 @@ def connect(addr, timeout=None, deserialize=True, connection_args=None):
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = config.get('connect-timeout', '10s')
timeout = dask.config.get('distributed.comm.timeouts.connect')
timeout = parse_timedelta(timeout, default='seconds')

scheme, loc = parse_address(addr)
Expand Down
8 changes: 3 additions & 5 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
except ImportError:
ssl = None

import dask
import tornado
from tornado import gen, netutil
from tornado.iostream import StreamClosedError, IOStream
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer

from .. import config
from ..compatibility import finalize, PY3
from ..utils import (ensure_bytes, ensure_ip, get_ip, get_ipv6, nbytes,
parse_timedelta, shutting_down)
Expand All @@ -42,8 +42,6 @@ def get_total_physical_memory():

MAX_BUFFER_SIZE = get_total_physical_memory()

DEFAULT_BACKLOG = 2048


def set_tcp_timeout(stream):
"""
Expand All @@ -52,7 +50,7 @@ def set_tcp_timeout(stream):
if stream.closed():
return

timeout = config.get('tcp-timeout', 30)
timeout = dask.config.get('distributed.comm.timeouts.tcp')
timeout = int(parse_timedelta(timeout, default='seconds'))

sock = stream.socket
Expand Down Expand Up @@ -369,7 +367,7 @@ def start(self):
self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE,
**self.server_args)
self.tcp_server.handle_stream = self._handle_stream
backlog = int(config.get('socket-backlog', DEFAULT_BACKLOG))
backlog = int(dask.config.get('distributed.comm.socket-backlog'))
for i in range(5):
try:
# When shuffling data between workers, there can
Expand Down
Loading