diff --git a/distributed/__init__.py b/distributed/__init__.py index fe2cfd1edb2..b4a70f81730 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, absolute_import -from .config import config +from . import config +from dask.config import config from .core import connect, rpc from .deploy import LocalCluster, Adaptive from .diagnostics import progress diff --git a/distributed/batched.py b/distributed/batched.py index 9326a53bdee..13c7ccbadbf 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -3,10 +3,10 @@ from collections import deque import logging +import dask from tornado import gen, locks from tornado.ioloop import IOLoop -from .config import config from .core import CommClosedError from .utils import parse_timedelta @@ -51,7 +51,7 @@ def __init__(self, interval, loop=None): self.batch_count = 0 self.byte_count = 0 self.next_deadline = None - self.recent_message_log = deque(maxlen=config.get('recent-messages-log-length', 0)) + self.recent_message_log = deque(maxlen=dask.config.get('distributed.comm.recent-messages-log-length')) def start(self, comm): self.comm = comm diff --git a/distributed/bokeh/components.py b/distributed/bokeh/components.py index dc46bb68695..2661a32889f 100644 --- a/distributed/bokeh/components.py +++ b/distributed/bokeh/components.py @@ -11,20 +11,20 @@ Quad, TapTool, OpenURL, Button, Select) from bokeh.palettes import Spectral9 from bokeh.plotting import figure +import dask from tornado import gen -from ..config import config from ..diagnostics.progress_stream import nbytes_bar from .. import profile from ..utils import log_errors, parse_timedelta -if config.get('bokeh-export-tool', False): +if dask.config.get('distributed.dashboard.export-tool'): from .export_tool import ExportTool else: ExportTool = None -profile_interval = config.get('profile-interval', 10) +profile_interval = dask.config.get('distributed.worker.profile.interval') profile_interval = parse_timedelta(profile_interval, default='ms') diff --git a/distributed/client.py b/distributed/client.py index 79435096a46..815d9a55c6f 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -47,7 +47,6 @@ scatter_to_workers, gather_from_workers) from .cfexecutor import ClientExecutor from .compatibility import Queue as pyQueue, Empty, isqueue, html_escape -from .config import config from .core import connect, rpc, clean_exception, CommClosedError from .metrics import time from .node import Node @@ -498,7 +497,7 @@ def __init__(self, address=None, loop=None, timeout=no_default, name=None, heartbeat_interval=None, serializers=None, deserializers=None, **kwargs): if timeout == no_default: - timeout = config.get('connect-timeout', '10s') + timeout = dask.config.get('distributed.comm.timeouts.connect') if timeout is not None: timeout = parse_timedelta(timeout, 's') self._timeout = timeout @@ -506,8 +505,8 @@ def __init__(self, address=None, loop=None, timeout=no_default, self.futures = dict() self.refcount = defaultdict(lambda: 0) self.coroutines = [] - if name is None and 'client-name' in config: - name = config['client-name'] + if name is None: + name = dask.config.get('client-name', None) self.id = type(self).__name__ + ('-' + name + '-' if name else '-') + str(uuid.uuid1(clock_seq=os.getpid())) self.generation = 0 self.status = 'newly-created' @@ -538,7 +537,7 @@ def __init__(self, address=None, loop=None, timeout=no_default, self.loop = self._loop_runner.loop if heartbeat_interval is None: - heartbeat_interval = config.get('client-heartbeat-interval', 5000) + heartbeat_interval = dask.config.get('distributed.client.heartbeat') heartbeat_interval = parse_timedelta(heartbeat_interval, default='ms') self._periodic_callbacks = dict() @@ -551,8 +550,8 @@ def __init__(self, address=None, loop=None, timeout=no_default, io_loop=self.loop ) - if address is None and 'scheduler-address' in config: - address = config['scheduler-address'] + if address is None: + address = dask.config.get('scheduler-address', None) if address: logger.info("Config value `scheduler-address` found: %s", address) @@ -671,7 +670,7 @@ def _repr_html_(self): host = 'localhost' else: host = rest.split(':')[0] - template = config.get('diagnostics-link', 'http://{host}:{port}/status') + template = dask.config.get('distributed.dashboard.link') address = template.format(host=host, port=port, **os.environ) text += "
  • Dashboard: %(web)s\n" % {'web': address} diff --git a/distributed/comm/addressing.py b/distributed/comm/addressing.py index 20a3c05f8f7..8ff401475b9 100644 --- a/distributed/comm/addressing.py +++ b/distributed/comm/addressing.py @@ -2,11 +2,12 @@ import six -from ..config import config +import dask + from . import registry -DEFAULT_SCHEME = config.get('default-scheme', 'tcp') +DEFAULT_SCHEME = dask.config.get('distributed.comm.default-scheme') def parse_address(addr, strict=False): diff --git a/distributed/comm/core.py b/distributed/comm/core.py index 1ace05283d2..ff3607b660e 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -4,11 +4,10 @@ from datetime import timedelta import logging +import dask from six import with_metaclass - from tornado import gen -from ..config import config from ..metrics import time from ..utils import parse_timedelta from . import registry @@ -161,7 +160,7 @@ def connect(addr, timeout=None, deserialize=True, connection_args=None): retried until the *timeout* is expired. """ if timeout is None: - timeout = config.get('connect-timeout', '10s') + timeout = dask.config.get('distributed.comm.timeouts.connect') timeout = parse_timedelta(timeout, default='seconds') scheme, loc = parse_address(addr) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index e80fa820dbf..9a8fe7bf087 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -11,13 +11,13 @@ except ImportError: ssl = None +import dask import tornado from tornado import gen, netutil from tornado.iostream import StreamClosedError, IOStream from tornado.tcpclient import TCPClient from tornado.tcpserver import TCPServer -from .. import config from ..compatibility import finalize, PY3 from ..utils import (ensure_bytes, ensure_ip, get_ip, get_ipv6, nbytes, parse_timedelta, shutting_down) @@ -42,8 +42,6 @@ def get_total_physical_memory(): MAX_BUFFER_SIZE = get_total_physical_memory() -DEFAULT_BACKLOG = 2048 - def set_tcp_timeout(stream): """ @@ -52,7 +50,7 @@ def set_tcp_timeout(stream): if stream.closed(): return - timeout = config.get('tcp-timeout', 30) + timeout = dask.config.get('distributed.comm.timeouts.tcp') timeout = int(parse_timedelta(timeout, default='seconds')) sock = stream.socket @@ -369,7 +367,7 @@ def start(self): self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE, **self.server_args) self.tcp_server.handle_stream = self._handle_stream - backlog = int(config.get('socket-backlog', DEFAULT_BACKLOG)) + backlog = int(dask.config.get('distributed.comm.socket-backlog')) for i in range(5): try: # When shuffling data between workers, there can diff --git a/distributed/config.py b/distributed/config.py index a0f488474e5..17320f05081 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -1,90 +1,72 @@ from __future__ import print_function, division, absolute_import -from contextlib import contextmanager import logging import logging.config import os import sys -import warnings -from .compatibility import FileExistsError, logging_names +import dask +import yaml -logger = logging.getLogger(__name__) +from .compatibility import logging_names -config = {} - - -def ensure_config_file(source, destination): - if not os.path.exists(destination): - import shutil - if not os.path.exists(os.path.dirname(destination)): - try: - os.mkdir(os.path.dirname(destination)) - except FileExistsError: - pass - # Atomically create destination. Parallel testing discovered - # a race condition where a process can be busy creating the - # destination while another process reads an empty config file. - tmp = '%s.tmp.%d' % (destination, os.getpid()) - shutil.copy(source, tmp) - try: - os.rename(tmp, destination) - except OSError: - os.remove(tmp) - - -def determine_config_file(): - path = os.environ.get('DASK_CONFIG') - if path: - if (os.path.exists(path) and - (os.path.isfile(path) or os.path.islink(path))): - return path - warnings.warn("DASK_CONFIG set to '%s' but file does not exist " - "or is not a regular file" % (path,), - UserWarning) - - dirname = os.path.dirname(__file__) - default_path = os.path.join(dirname, 'config.yaml') - path = os.path.join(os.path.expanduser('~'), '.dask', 'config.yaml') - - try: - ensure_config_file(default_path, path) - except EnvironmentError as e: - warnings.warn("Could not write default config file to '%s'. " - "Received error %s" % (path, e), - UserWarning) - - return path if os.path.exists(path) else default_path - - -def load_config_file(config, path): - with open(path) as f: - text = f.read() - config.update(yaml.load(text) or {}) - - -def load_env_vars(config): - for name, value in os.environ.items(): - if name.startswith('DASK_'): - varname = name[5:].lower().replace('_', '-') - config[varname] = _parse_env_value(value) - - -def _parse_env_value(value): - """ Convert a string to an integer, float or boolean (in that order) if possible. """ - bools = { - 'true': True, - 'false': False - } - try: - return int(value) - except ValueError: - pass - try: - return float(value) - except ValueError: - pass - return bools.get(value.lower(), value) +config = dask.config.config + + +fn = os.path.join(os.path.dirname(__file__), 'distributed.yaml') +dask.config.ensure_file(source=fn) + +with open(fn) as f: + defaults = yaml.load(f) + +dask.config.update(dask.config.config, defaults, priority='old') + +aliases = { + 'allowed-failures': 'distributed.scheduler.allowed-failures', + 'bandwidth': 'distributed.scheduler.bandwidth', + 'default-data-size': 'distributed.scheduler.default-data-size', + 'transition-log-length': 'distributed.scheduler.transition-log-length', + 'work-stealing': 'distributed.scheduler.work-stealing', + 'worker-ttl': 'distributed.scheduler.worker-ttl', + + 'multiprocessing-method': 'distributed.worker.multiprocessing-method', + 'use-file-locking': 'distributed.worker.use-file-locking', + 'profile-interval': 'distributed.worker.profile.interval', + 'profile-cycle-interval': 'distributed.worker.profile.cycle', + 'worker-memory-target': 'distributed.worker.memory.target', + 'worker-memory-spill': 'distributed.worker.memory.spill', + 'worker-memory-pause': 'distributed.worker.memory.pause', + 'worker-memory-terminate': 'distributed.worker.memory.terminate', + + 'heartbeat-interval': 'distributed.client.heartbeat', + + 'compression': 'distributed.comm.compression', + 'connect-timeout': 'distributed.comm.timeouts.connect', + 'tcp-timeout': 'distributed.comm.timeouts.tcp', + 'default-scheme': 'distributed.comm.default-scheme', + 'socket-backlog': 'distributed.comm.socket-backlog', + 'recent-messages-log-length': 'distributed.comm.recent-messages-log-length', + + 'diagnostics-link': 'distributed.dashboard.link', + 'bokeh-export-tool': 'distributed.dashboard.export-tool', + + 'tick-time': 'distributed.admin.tick.interval', + 'tick-maximum-delay': 'distributed.admin.tick.limit', + 'log-length': 'distributed.admin.log-length', + 'log-format': 'distributed.admin.log-format', + 'pdb-on-err': 'distributed.admin.pdb-on-err', +} + +dask.config.rename(aliases) + + +######################### +# Logging specific code # +######################### +# +# Here we enact the policies in the logging part of the configuration + +logger = logging.getLogger(__name__) def _initialize_logging_old_style(config): @@ -108,7 +90,8 @@ def _initialize_logging_old_style(config): loggers.update(config.get('logging', {})) handler = logging.StreamHandler(sys.stderr) - handler.setFormatter(logging.Formatter(log_format)) + handler.setFormatter(logging.Formatter(dask.config.get('distributed.admin.log-format', + config=config))) for name, level in loggers.items(): if isinstance(level, str): level = logging_names[level.upper()] @@ -124,7 +107,7 @@ def _initialize_logging_new_style(config): Initialize logging using logging's "Configuration dictionary schema". (ref.: https://docs.python.org/2/library/logging.config.html#logging-config-dictschema) """ - logging.config.dictConfig(config['logging']) + logging.config.dictConfig(config.get('logging')) def _initialize_logging_file_config(config): @@ -132,7 +115,7 @@ def _initialize_logging_file_config(config): Initialize logging using logging's "Configuration file format". (ref.: https://docs.python.org/2/library/logging.config.html#configuration-file-format) """ - logging.config.fileConfig(config['logging-file-config'], disable_existing_loggers=False) + logging.config.fileConfig(config.get('logging-file-config'), disable_existing_loggers=False) def initialize_logging(config): @@ -150,38 +133,4 @@ def initialize_logging(config): _initialize_logging_old_style(config) -@contextmanager -def set_config(arg=None, **kwargs): - if arg and not kwargs: - kwargs = arg - old = {} - for key in kwargs: - if key in config: - old[key] = config[key] - - for key, value in kwargs.items(): - config[key] = value - - try: - yield - finally: - for key in kwargs: - if key in old: - config[key] = old[key] - else: - del config[key] - - -try: - import yaml -except ImportError: - pass -else: - path = determine_config_file() - load_config_file(config, path) - -load_env_vars(config) - -log_format = config.get('log-format', '%(name)s - %(levelname)s - %(message)s') - -initialize_logging(config) +initialize_logging(dask.config.config) diff --git a/distributed/config.yaml b/distributed/config.yaml deleted file mode 100644 index fab041560f1..00000000000 --- a/distributed/config.yaml +++ /dev/null @@ -1,91 +0,0 @@ -version: 1 - -# logging: -# distributed: info -# distributed.client: warning -# bokeh: critical -# # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr -# tornado: critical -# tornado.application: error - - -##################### -# Scheduler options # -##################### -# -# bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth -# allowed-failures: 3 # number of retries before a task is considered bad -# pdb-on-err: False # enter debug mode on scheduling error -# transition-log-length: 100000 -# work-stealing: True # workers should steal tasks from each other -# worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this - - -################## -# Worker options # -################## -# -# multiprocessing-method: forkserver -# use-file-locking: True -# profile-interval: 10ms # Time between statistical profiling queries -# profile-cycle-interval: 1000ms # Time between starting new profile - -# Fractions of worker memory at which we take action to avoid memory blowup -# Set any of the lower three values to False to turn off the behavior entirely -# -# worker-memory-target: 0.60 # target fraction to stay below -# worker-memory-spill: 0.70 # fraction at which we spill to disk -# worker-memory-pause: 0.80 # fraction at which we pause worker threads -# worker-memory-terminate: 0.95 # fraction at which we terminate the worker - - -################## -# Client Options # -################## - -# client-heartbeat-interval: 5s # time between client heartbeats - - -######################### -# Communication options # -######################### -# -# compression: auto -# connect-timeout: 10s # seconds delay before connecting fails -# tcp-timeout: 30s # seconds delay before calling an unresponsive connection dead -# default-scheme: tcp -# require-encryption: False # whether to require encryption on non-local comms -# socket-backlog: 2048 -# recent-messages-log-length: 0 # number of messages to keep for debugging - -# tls: -# ca-file: xxx.pem -# scheduler: -# key: xxx.pem -# cert: xxx.pem -# worker: -# key: xxx.pem -# cert: xxx.pem -# client: -# key: xxx.pem -# cert: xxx.pem -# ciphers: -# ECDHE-ECDSA-AES128-GCM-SHA256 - - -################### -# Bokeh dashboard # -################### -# -# bokeh-export-tool: False -# diagnostics-link: "http://{host}:{port}/status" - -################## -# Administrative # -################## -# -# tick-time: 20ms # time between event loop health checks -# tick-maximum-delay: 3s # time allowed before triggering a warning - -# log-length: 10000 # default length of logs to keep in memory -# log-format: '%(name)s - %(levelname)s - %(message)s' diff --git a/distributed/core.py b/distributed/core.py index 5126e963282..0827e603405 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -10,10 +10,9 @@ import uuid import weakref +import dask from six import string_types - from toolz import assoc - from tornado import gen from tornado.ioloop import IOLoop from tornado.locks import Event @@ -22,7 +21,6 @@ from .comm import (connect, listen, CommClosedError, normalize_address, unparse_host_port, get_address_host_port) -from .config import config from .metrics import time from .system_monitor import SystemMonitor from .utils import (get_traceback, truncate_exception, ignoring, shutting_down, @@ -47,7 +45,7 @@ def get_total_physical_memory(): MAX_BUFFER_SIZE = get_total_physical_memory() -tick_maximum_delay = parse_timedelta(config.get('tick-maximum-delay', 1000), default='ms') +tick_maximum_delay = parse_timedelta(dask.config.get('distributed.admin.tick.limit'), default='ms') class Server(object): @@ -127,7 +125,7 @@ def __init__(self, handlers, connection_limit=512, deserialize=True, self._last_tick = time() pc = PeriodicCallback( self._measure_tick, - parse_timedelta(config.get('tick-time', 20), default='ms') * 1000, + parse_timedelta(dask.config.get('distributed.admin.tick.interval'), default='ms') * 1000, io_loop=self.io_loop ) self.periodic_callbacks['tick'] = pc diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index bd5caa709c0..319b969d513 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -2,11 +2,12 @@ import os from weakref import ref -from ..config import config -from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring +import dask from .adaptive import Adaptive +from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring + logger = logging.getLogger(__name__) @@ -137,7 +138,7 @@ def _widget(self): layout = Layout(width='150px') if 'bokeh' in self.scheduler.services: - template = config.get('diagnostics-link', 'http://{host}:{port}/status') + template = dask.config.get('distributed.dashboard.link') host = self.scheduler.address.split('://')[1].split(':')[0] port = self.scheduler.services['bokeh'].port diff --git a/distributed/diskutils.py b/distributed/diskutils.py index 2781e9a56f5..ccc3096c038 100644 --- a/distributed/diskutils.py +++ b/distributed/diskutils.py @@ -8,9 +8,9 @@ import stat import tempfile -from . import locket +import dask -from . import config +from . import locket from .compatibility import finalize @@ -20,7 +20,7 @@ def is_locking_enabled(): - return config.get('use-file-locking', True) + return dask.config.get('distributed.worker.use-file-locking') def safe_unlink(path): diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml new file mode 100644 index 00000000000..35766471e00 --- /dev/null +++ b/distributed/distributed.yaml @@ -0,0 +1,84 @@ +distributed: + version: 2 + # logging: + # distributed: info + # distributed.client: warning + # bokeh: critical + # # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr + # tornado: critical + # tornado.application: error + + scheduler: + allowed-failures: 3 # number of retries before a task is considered bad + bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth + default-data-size: 1000 + transition-log-length: 100000 + work-stealing: True # workers should steal tasks from each other + worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this + + worker: + multiprocessing-method: forkserver + use-file-locking: True + + profile: + interval: 10ms # Time between statistical profiling queries + cycle: 1000ms # Time between starting new profile + + # Fractions of worker memory at which we take action to avoid memory blowup + # Set any of the lower three values to False to turn off the behavior entirely + memory: + target: 0.60 # target fraction to stay below + spill: 0.70 # fraction at which we spill to disk + pause: 0.80 # fraction at which we pause worker threads + terminate: 0.95 # fraction at which we terminate the worker + + client: + heartbeat: 5s # time between client heartbeats + + comm: + compression: auto + default-scheme: tcp + socket-backlog: 2048 + recent-messages-log-length: 0 # number of messages to keep for debugging + + timeouts: + connect: 10s # time before connecting fails + tcp: 30s # time before calling an unresponsive connection dead + + # require-encryption: False # whether to require encryption on non-local comms + # + # tls: + # ca-file: xxx.pem + # scheduler: + # key: xxx.pem + # cert: xxx.pem + # worker: + # key: xxx.pem + # cert: xxx.pem + # client: + # key: xxx.pem + # cert: xxx.pem + # ciphers: + # ECDHE-ECDSA-AES128-GCM-SHA256 + + + ################### + # Bokeh dashboard # + ################### + + dashboard: + link: "http://{host}:{port}/status" + export-tool: False + + ################## + # Administrative # + ################## + + admin: + tick: + interval: 20ms # time between event loop health checks + limit: 3s # time allowed before triggering a warning + + log-length: 10000 # default length of logs to keep in memory + log-format: '%(name)s - %(levelname)s - %(message)s' + pdb-on-err: False # enter debug mode on scheduling error diff --git a/distributed/nanny.py b/distributed/nanny.py index 39805a004ce..afe02431438 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -9,12 +9,12 @@ import threading import uuid +import dask from tornado import gen from tornado.ioloop import IOLoop, TimeoutError from tornado.locks import Event from .comm import get_address_host, get_local_address_for, unparse_host_port -from .config import config from .core import rpc, RPCClosed, CommClosedError, coerce_to_address from .metrics import time from .node import ServerNode @@ -48,8 +48,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, if scheduler_file: cfg = json_load_robust(scheduler_file) self.scheduler_addr = cfg['address'] - elif scheduler_ip is None and config.get('scheduler-address'): - self.scheduler_addr = config['scheduler-address'] + elif scheduler_ip is None and dask.config.get('scheduler-address'): + self.scheduler_addr = dask.config.get('scheduler-address') elif scheduler_port is None: self.scheduler_addr = coerce_to_address(scheduler_ip) else: @@ -64,7 +64,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.preload_argv = preload_argv self.contact_address = contact_address - self.memory_terminate_fraction = config.get('worker-memory-terminate', 0.95) + self.memory_terminate_fraction = dask.config.get('distributed.worker.memory.terminate') self.security = security or Security() assert isinstance(self.security, Security) diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index 398fb508a81..49d20175cf4 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -8,6 +8,7 @@ import logging import random +import dask from dask.context import _globals from toolz import identity, partial @@ -19,7 +20,6 @@ except ImportError: blosc = False -from ..config import config from ..utils import ignoring, ensure_bytes @@ -98,7 +98,7 @@ def _fixed_lz4_decompress(data): 'decompress': blosc.decompress} -default = config.get('compression', 'auto') +default = dask.config.get('distributed.comm.compression') if default != 'auto': if default in compressions: default_compression = default diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cb4b7897f86..14acf620cef 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -24,14 +24,12 @@ from tornado.gen import Return from tornado.ioloop import IOLoop -from dask.core import reverse_dict -from dask.order import order +import dask from .batched import BatchedSend from .comm import (normalize_address, resolve_address, get_address_host, unparse_host_port) from .compatibility import finalize, unicode -from .config import config, log_format from .core import (rpc, connect, send_recv, error_message, clean_exception, CommClosedError) from . import profile @@ -56,11 +54,11 @@ logger = logging.getLogger(__name__) -BANDWIDTH = config.get('bandwidth', 100e6) -ALLOWED_FAILURES = config.get('allowed-failures', 3) +BANDWIDTH = dask.config.get('distributed.scheduler.bandwidth') +ALLOWED_FAILURES = dask.config.get('distributed.scheduler.allowed-failures') -LOG_PDB = config.get('pdb-on-err') or os.environ.get('DASK_ERROR_PDB', False) -DEFAULT_DATA_SIZE = config.get('default-data-size', 1000) +LOG_PDB = dask.config.get('distributed.admin.pdb-on-err') +DEFAULT_DATA_SIZE = dask.config.get('distributed.scheduler.default-data-size') DEFAULT_EXTENSIONS = [ LockExtension, @@ -70,7 +68,7 @@ VariableExtension, ] -if config.get('work-stealing', True): +if dask.config.get('distributed.scheduler.work-stealing'): DEFAULT_EXTENSIONS.append(WorkStealing) ALL_TASK_STATES = {'released', 'waiting', 'no-worker', 'processing', 'erred', 'memory'} @@ -756,7 +754,7 @@ def __init__( self.service_specs = services or {} self.services = {} self.scheduler_file = scheduler_file - worker_ttl = worker_ttl or config.get('worker-ttl') + worker_ttl = worker_ttl or dask.config.get('distributed.scheduler.worker-ttl') self.worker_ttl = parse_timedelta(worker_ttl) if worker_ttl else None self.security = security or Security() @@ -868,9 +866,8 @@ def __init__( self.extensions = {} self.plugins = [] - self.transition_log = deque(maxlen=config.get('transition-log-length', - 100000)) - self.log = deque(maxlen=config.get('transition-log-length', 100000)) + self.transition_log = deque(maxlen=dask.config.get('distributed.scheduler.transition-log-length')) + self.log = deque(maxlen=dask.config.get('distributed.scheduler.transition-log-length')) self.worker_handlers = {'task-finished': self.handle_task_finished, 'task-erred': self.handle_task_erred, @@ -1166,8 +1163,8 @@ def cleanup(self): yield future def _setup_logging(self): - self._deque_handler = DequeHandler(n=config.get('log-length', 10000)) - self._deque_handler.setFormatter(logging.Formatter(log_format)) + self._deque_handler = DequeHandler(n=dask.config.get('distributed.admin.log-length')) + self._deque_handler.setFormatter(logging.Formatter(dask.config.get('distributed.admin.log-format'))) logger.addHandler(self._deque_handler) finalize(self, logger.removeHandler, self._deque_handler) @@ -1331,7 +1328,7 @@ def update_graph(self, client=None, tasks=None, keys=None, already_in_memory.add(k) if already_in_memory: - dependents = reverse_dict(dependencies) + dependents = dask.core.reverse_dict(dependencies) stack = list(already_in_memory) done = set(already_in_memory) while stack: # remove unnecessary dependencies @@ -1382,7 +1379,7 @@ def update_graph(self, client=None, tasks=None, keys=None, if isinstance(user_priority, Number): user_priority = {k: user_priority for k in tasks} - priority = priority or order(tasks) # TODO: define order wrt old graph + priority = priority or dask.order.order(tasks) # TODO: define order wrt old graph if submitting_task: # sub-tasks get better priority than parent tasks ts = self.tasks.get(submitting_task) @@ -4086,7 +4083,7 @@ def get_profile(self, comm=None, workers=None, merge_workers=True, @gen.coroutine def get_profile_metadata(self, comm=None, workers=None, merge_workers=True, start=None, stop=None, profile_cycle_interval=None): - dt = profile_cycle_interval or config.get('profile-cycle-interval', 1000) + dt = profile_cycle_interval or dask.config.get('distributed.worker.profile.cycle') dt = parse_timedelta(dt, default='ms') if workers is None: diff --git a/distributed/security.py b/distributed/security.py index 59fef7925a6..0a40396a54a 100644 --- a/distributed/security.py +++ b/distributed/security.py @@ -5,7 +5,7 @@ except ImportError: ssl = None -from . import config +import dask _roles = ['client', 'scheduler', 'worker'] @@ -49,7 +49,7 @@ class Security(object): __slots__ = tuple(_fields) def __init__(self, **kwargs): - self._init_from_dict(config) + self._init_from_dict(dask.config.config) for k, v in kwargs.items(): if v is not None: setattr(self, k, v) diff --git a/distributed/stealing.py b/distributed/stealing.py index 2994ff25181..a1ff2bbd40d 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -3,10 +3,9 @@ from collections import defaultdict, deque import logging from math import log -import os from time import time -from .config import config +import dask from .core import CommClosedError from .diagnostics.plugin import SchedulerPlugin from .utils import log_errors, PeriodicCallback @@ -23,7 +22,7 @@ logger = logging.getLogger(__name__) -LOG_PDB = config.get('pdb-on-err') or os.environ.get('DASK_ERROR_PDB', False) +LOG_PDB = dask.config.get('distributed.admin.pdb-on-err') class WorkStealing(SchedulerPlugin): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 87158b9912b..e9f204383a2 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -28,10 +28,9 @@ import dask from dask import delayed from dask.context import _globals -from distributed import (Worker, Nanny, fire_and_forget, config, LocalCluster, +from distributed import (Worker, Nanny, fire_and_forget, LocalCluster, get_client, secede, get_worker, Executor, profile, TimeoutError) -from distributed.config import set_config from distributed.comm import CommClosedError from distributed.client import (Client, Future, wait, as_completed, tokenize, _get_global_client, default_client, @@ -5182,16 +5181,15 @@ def test_avoid_delayed_finalize(c, s, a, b): @gen_cluster() def test_config_scheduler_address(s, a, b): - config['scheduler-address'] = s.address - with captured_logger('distributed.client') as sio: - c = yield Client(asynchronous=True) - assert c.scheduler.address == s.address + with dask.config.set({'scheduler-address': s.address}): + with captured_logger('distributed.client') as sio: + c = yield Client(asynchronous=True) + assert c.scheduler.address == s.address - text = sio.getvalue() - assert s.address in text + text = sio.getvalue() + assert s.address in text - del config['scheduler-address'] - yield c.close() + yield c.close() @gen_cluster(client=True) @@ -5239,12 +5237,9 @@ def test_unhashable_function(c, s, a, b): @gen_cluster() def test_client_name(s, a, b): - config['client-name'] = 'hello-world' - try: + with dask.config.set({'client-name': 'hello-world'}): c = yield Client(s.address, asynchronous=True) assert any("hello-world" in name for name in list(s.clients)) - finally: - del config['client-name'] yield c._close() @@ -5273,18 +5268,15 @@ def test_diagnostics_link_env_variable(loop): from distributed.bokeh.scheduler import BokehScheduler with cluster(scheduler_kwargs={'services': {('bokeh', 12355): BokehScheduler}}) as (s, [a, b]): with Client(s['address'], loop=loop) as c: - config['diagnostics-link'] = 'http://foo-{USER}:{port}/status' - try: + with dask.config.set({'distributed.dashboard.link': 'http://foo-{USER}:{port}/status'}): text = c._repr_html_() link = 'http://foo-' + os.environ['USER'] + ':12355/status' assert link in text - finally: - del config['diagnostics-link'] @gen_test() def test_client_timeout_2(): - with set_config({'connect-timeout': '10ms'}): + with dask.config.set({'distributed.comm.timeouts.connect': '10ms'}): start = time() c = Client('127.0.0.1:3755', asynchronous=True) with pytest.raises((TimeoutError, IOError)): diff --git a/distributed/tests/test_config.py b/distributed/tests/test_config.py index 14a32f14aa1..f14f07308c2 100644 --- a/distributed/tests/test_config.py +++ b/distributed/tests/test_config.py @@ -9,8 +9,8 @@ import pytest from distributed.utils_test import (captured_handler, captured_logger, - new_config, new_config_file, new_environment) -from distributed.config import initialize_logging, set_config, config, load_env_vars + new_config, new_config_file) +from distributed.config import initialize_logging def dump_logger_list(): @@ -120,6 +120,7 @@ def test_logging_simple(): with new_config_file(c): code = """if 1: import logging + import dask from distributed.utils_test import captured_handler @@ -139,7 +140,7 @@ def test_logging_simple(): assert distributed_log == [ "distributed.foo - INFO - 1: info", "distributed.foo.bar - ERROR - 3: error", - ] + ], (dask.config.config, distributed_log) """ subprocess.check_call([sys.executable, "-c", code]) @@ -272,35 +273,3 @@ def test_logging_file_config(): """ subprocess.check_call([sys.executable, "-c", code]) os.remove(logging_config.name) - - -def test_set_config(): - assert 'foo' not in config - with set_config(foo=1): - assert config['foo'] == 1 - assert 'foo' not in config - - -def test_load_env_vars(): - environment = dict( - DASK_STRING='test', - DASK_INT='20', - DASK_TRUE='True', - DASK_FALSE='false', - DASK_FLOAT='1.5', - NOT_FOR_DASK='__variable not used__' - ) - conf = {} - with new_environment(environment): - load_env_vars(conf) - assert conf['string'] == 'test' - assert conf['int'] == 20 - assert conf['true'] is True - assert conf['false'] is False - assert conf['float'] == 1.5 - assert isinstance(conf['string'], str) - assert isinstance(conf['int'], int) - assert isinstance(conf['float'], float) - assert isinstance(conf['true'], bool) - assert isinstance(conf['false'], bool) - assert '__variable not used__' not in conf.values() diff --git a/distributed/tests/test_diskutils.py b/distributed/tests/test_diskutils.py index 5cf2f4e03ef..598c2506b0d 100644 --- a/distributed/tests/test_diskutils.py +++ b/distributed/tests/test_diskutils.py @@ -10,11 +10,12 @@ import mock +import dask from distributed.compatibility import Empty from distributed.diskutils import WorkSpace from distributed.metrics import time from distributed.utils import mp_context -from distributed.utils_test import captured_logger, slow, new_config +from distributed.utils_test import captured_logger, slow def assert_directory_contents(dir_path, expected): @@ -153,7 +154,7 @@ def test_workspace_rmtree_failure(tmpdir): def test_locking_disabled(tmpdir): base_dir = str(tmpdir) - with new_config({'use-file-locking': False}): + with dask.config.set({'distributed.worker.use-file-locking': False}): with mock.patch('distributed.diskutils.locket.lock_file') as lock_file: assert_contents = functools.partial(assert_directory_contents, base_dir) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index f518727c1a7..7f0e703b6c4 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -12,8 +12,8 @@ from toolz import valmap, first from tornado import gen +import dask from distributed import Nanny, rpc, Scheduler -from distributed.config import config from distributed.core import CommClosedError from distributed.metrics import time from distributed.protocol.pickle import dumps @@ -288,8 +288,7 @@ def test_avoid_memory_monitor_if_zero_limit(c, s): @gen_cluster(ncores=[], client=True) def test_scheduler_address_config(c, s): - config['scheduler-address'] = s.address - try: + with dask.config.set({'scheduler-address': s.address}): nanny = Nanny(loop=s.loop) yield nanny._start() assert nanny.scheduler.address == s.address @@ -299,8 +298,6 @@ def test_scheduler_address_config(c, s): yield gen.sleep(0.1) assert time() < start + 10 - finally: - del config['scheduler-address'] yield nanny._close() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index cb854785f9f..5f411bcfcd2 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -15,7 +15,6 @@ import pytest from distributed import Nanny, Worker, Client, wait, fire_and_forget -from distributed.config import set_config from distributed.core import connect, rpc, CommClosedError from distributed.scheduler import Scheduler, BANDWIDTH from distributed.client import wait @@ -479,7 +478,7 @@ def test_worker_name(): @gen_test() def test_coerce_address(): - with set_config({'connect-timeout': '100ms'}): + with dask.config.set({'distributed.comm.timeouts.connect': '100ms'}): s = Scheduler(validate=True) s.start(0) print("scheduler:", s.address, s.listen_address) @@ -1040,7 +1039,7 @@ def test_scheduler_file(): @gen_cluster(client=True, ncores=[]) def test_non_existent_worker(c, s): - with set_config({'connect-timeout': '100ms'}): + with dask.config.set({'distributed.comm.timeouts.connect': '100ms'}): s.add_worker(address='127.0.0.1:5738', ncores=2, nbytes={}, host_info={}) futures = c.map(inc, range(10)) yield gen.sleep(0.300) @@ -1208,7 +1207,7 @@ def test_retries(c, s, a, b): @pytest.mark.xfail(reason="second worker also errant for some reason") @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 3, timeout=5) def test_mising_data_errant_worker(c, s, w1, w2, w3): - with set_config({'connect-timeout': '1s'}): + with dask.config.set({'distributed.comm.timeouts.connect': '1s'}): np = pytest.importorskip('numpy') x = c.submit(np.random.random, 10000000, workers=w1.address) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 61b5b5aeae5..0de4efccaed 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -132,7 +132,7 @@ def test_wait_for_port(): def test_new_config(): c = config.copy() with new_config({'xyzzy': 5}): - assert config == {'xyzzy': 5} + config['xyzzy'] == 5 assert config == c assert 'xyzzy' not in config diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 36abcbf3f73..3417acb50e2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -11,6 +11,7 @@ from time import sleep import traceback +import dask from dask import delayed import pytest from toolz import pluck, sliding_window, first @@ -21,7 +22,6 @@ from distributed import (Nanny, Client, get_client, wait, default_client, get_worker, Reschedule) from distributed.compatibility import WINDOWS, cache_from_source -from distributed.config import config from distributed.core import rpc from distributed.client import wait from distributed.scheduler import Scheduler @@ -1125,11 +1125,8 @@ def test_parse_memory_limit(s, w): @gen_cluster(ncores=[], client=True) def test_scheduler_address_config(c, s): - config['scheduler-address'] = s.address - try: + with dask.config.set({'scheduler-address': s.address}): worker = Worker(loop=s.loop) yield worker._start() assert worker.scheduler.address == s.address - finally: - del config['scheduler-address'] yield worker._close() diff --git a/distributed/utils.py b/distributed/utils.py index f25047966ab..de2247ba3df 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -32,6 +32,7 @@ except ImportError: resource = None +import dask from dask import istask from toolz import memoize, valmap import tornado @@ -39,7 +40,6 @@ from tornado.ioloop import IOLoop, PollIOLoop from .compatibility import Queue, PY3, PY2, get_thread_identity, unicode -from .config import config from .metrics import time @@ -56,7 +56,7 @@ def _initialize_mp_context(): if PY3 and not sys.platform.startswith('win') and 'PyPy' not in sys.version: - method = config.get('multiprocessing-method', 'forkserver') + method = dask.config.get('distributed.worker.multiprocessing-method') ctx = multiprocessing.get_context(method) # Makes the test suite much faster preload = ['distributed'] diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 637a2b28f4e..bb418449558 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -32,6 +32,7 @@ import pytest import six +import dask from dask.context import _globals from toolz import merge, memoize from tornado import gen, queues @@ -39,7 +40,7 @@ from tornado.ioloop import IOLoop from .compatibility import PY3, iscoroutinefunction, Empty -from .config import config, initialize_logging +from .config import initialize_logging from .core import connect, rpc, CommClosedError from .metrics import time from .proctitle import enable_proctitle_on_children @@ -705,9 +706,8 @@ def test_foo(scheduler, worker1, worker2): start end """ - config['nanny-start-timeout'] = '5s' - config['connect-timeout'] = '5s' del _global_workers[:] + dask.config.set({'distributed.comm.timeouts.connect': '5s'}) worker_kwargs = merge({'memory_limit': TOTAL_MEMORY, 'death_timeout': 5}, worker_kwargs) @@ -1072,10 +1072,13 @@ def new_config(new_config): """ Temporarily change configuration dictionary. """ + from .config import defaults + config = dask.config.config orig_config = config.copy() try: config.clear() - config.update(new_config) + config.update(defaults.copy()) + dask.config.update(config, new_config) initialize_logging(config) yield finally: diff --git a/distributed/worker.py b/distributed/worker.py index a639078e173..19bfa042109 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -13,6 +13,7 @@ import warnings import weakref +import dask from dask.core import istask from dask.compatibility import apply try: @@ -28,7 +29,6 @@ from .batched import BatchedSend from .comm import get_address_host, get_local_address_for from .comm.utils import offload -from .config import config, log_format from .compatibility import unicode, get_thread_identity, finalize from .core import (error_message, CommClosedError, rpc, pingpong, coerce_to_address) @@ -54,7 +54,7 @@ logger = logging.getLogger(__name__) -LOG_PDB = config.get('pdb-on-err') +LOG_PDB = dask.config.get('distributed.admin.pdb-on-err') no_value = '--no-value-sentinel--' @@ -90,8 +90,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, if scheduler_file: cfg = json_load_robust(scheduler_file) scheduler_addr = cfg['address'] - elif scheduler_ip is None and config.get('scheduler-address'): - scheduler_addr = config['scheduler-address'] + elif scheduler_ip is None and dask.config.get('scheduler-address', None): + scheduler_addr = dask.config.get('scheduler-address') elif scheduler_port is None: scheduler_addr = coerce_to_address(scheduler_ip) else: @@ -128,15 +128,15 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, if 'memory_target_fraction' in kwargs: self.memory_target_fraction = kwargs.pop('memory_target_fraction') else: - self.memory_target_fraction = config.get('worker-memory-target', 0.6) + self.memory_target_fraction = dask.config.get('distributed.worker.memory.target') if 'memory_spill_fraction' in kwargs: self.memory_spill_fraction = kwargs.pop('memory_spill_fraction') else: - self.memory_spill_fraction = config.get('worker-memory-spill', 0.7) + self.memory_spill_fraction = dask.config.get('distributed.worker.memory.spill') if 'memory_pause_fraction' in kwargs: self.memory_pause_fraction = kwargs.pop('memory_pause_fraction') else: - self.memory_pause_fraction = config.get('worker-memory-pause', 0.8) + self.memory_pause_fraction = dask.config.get('distributed.worker.memory.pause') if self.memory_limit: try: @@ -212,8 +212,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, setproctitle("dask-worker [not started]") def _setup_logging(self): - self._deque_handler = DequeHandler(n=config.get('log-length', 10000)) - self._deque_handler.setFormatter(logging.Formatter(log_format)) + self._deque_handler = DequeHandler(n=dask.config.get('distributed.admin.log-length')) + self._deque_handler.setFormatter(logging.Formatter(dask.config.get('distributed.admin.log-format'))) logger.addHandler(self._deque_handler) finalize(self, logger.removeHandler, self._deque_handler) @@ -1101,7 +1101,7 @@ def __init__(self, *args, **kwargs): self.long_running = set() self.batched_stream = None - self.recent_messages_log = deque(maxlen=config.get('recent-messages-log-length', 0)) + self.recent_messages_log = deque(maxlen=dask.config.get('distributed.comm.recent-messages-log-length')) self.target_message_size = 50e6 # 50 MB self.log = deque(maxlen=100000) @@ -1137,14 +1137,14 @@ def __init__(self, *args, **kwargs): self._client = None profile_cycle_interval = kwargs.pop('profile_cycle_interval', - config.get('profile-cycle-interval', 1000)) + dask.config.get('distributed.worker.profile.cycle')) profile_cycle_interval = parse_timedelta(profile_cycle_interval, default='ms') WorkerBase.__init__(self, *args, **kwargs) pc = PeriodicCallback( self.trigger_profile, - parse_timedelta(config.get('profile-interval', 10), default='ms') * 1000, + parse_timedelta(dask.config.get('distributed.worker.profile.interval'), default='ms') * 1000, io_loop=self.io_loop ) self.periodic_callbacks['profile'] = pc diff --git a/requirements.txt b/requirements.txt index b0b250f2a19..02489c7ee6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ zict >= 0.1.3 # Compatibility packages futures; python_version < '3.0' singledispatch; python_version < '3.4' +pyyaml