diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index b719bbda12b..9e01ad93712 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, absolute_import import atexit +import dask import logging import os import shutil @@ -58,7 +59,7 @@ "cluster is on a shared network file system.") @click.option('--local-directory', default='', type=str, help="Directory to place scheduler files") -@click.option('--preload', type=str, multiple=True, is_eager=True, +@click.option('--preload', type=str, multiple=True, is_eager=True, default='', help='Module that should be loaded by the scheduler process ' 'like "foo.bar" or "/path/to/foo.py".') @click.argument('preload_argv', nargs=-1, @@ -125,6 +126,10 @@ def del_pid_file(): scheduler_file=scheduler_file, security=sec) scheduler.start(addr) + if not preload: + preload = dask.config.get('distributed.scheduler.preload') + if not preload_argv: + preload_argv = dask.config.get('distributed.scheduler.preload-argv') preload_modules(preload, parameter=scheduler, file_dir=local_directory, argv=preload_argv) logger.info('Local Directory: %26s', local_directory) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 31bd73e9663..b75bbd650ae 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -15,6 +15,8 @@ distributed: transition-log-length: 100000 work-stealing: True # workers should steal tasks from each other worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this + preload: [] + preload-argv: [] worker: multiprocessing-method: forkserver @@ -22,6 +24,8 @@ distributed: connections: # Maximum concurrent connections for data outgoing: 50 # This helps to control network saturation incoming: 10 + preload: [] + preload-argv: [] profile: interval: 10ms # Time between statistical profiling queries diff --git a/distributed/worker.py b/distributed/worker.py index 43b3ee13ed2..5709501d180 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -87,7 +87,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, services=None, service_ports=None, name=None, reconnect=True, memory_limit='auto', executor=None, resources=None, silence_logs=None, - death_timeout=None, preload=(), preload_argv=[], security=None, + death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, memory_monitor_interval='200ms', extensions=None, metrics=None, **kwargs): @@ -108,7 +108,11 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.available_resources = (resources or {}).copy() self.death_timeout = death_timeout self.preload = preload - self.preload_argv = preload_argv, + if self.preload is None: + self.preload = dask.config.get('distributed.worker.preload') + self.preload_argv = preload_argv + if self.preload_argv is None: + self.preload_argv = dask.config.get('distributed.worker.preload-argv') self.contact_address = contact_address self.memory_monitor_interval = parse_timedelta(memory_monitor_interval, default='ms') self.extensions = dict()