From 654cc92cc26056026bc27ed9e7254def3a98f037 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Mon, 29 Oct 2018 11:58:55 +0100 Subject: [PATCH 1/2] Add preload script to conf --- distributed/cli/dask_scheduler.py | 7 ++++++- distributed/distributed.yaml | 4 ++++ distributed/worker.py | 8 ++++++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index b719bbda12b..702ea601dee 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -1,6 +1,7 @@ from __future__ import print_function, division, absolute_import import atexit +import dask import logging import os import shutil @@ -58,7 +59,7 @@ "cluster is on a shared network file system.") @click.option('--local-directory', default='', type=str, help="Directory to place scheduler files") -@click.option('--preload', type=str, multiple=True, is_eager=True, +@click.option('--preload', type=str, multiple=True, is_eager=True, default='', help='Module that should be loaded by the scheduler process ' 'like "foo.bar" or "/path/to/foo.py".') @click.argument('preload_argv', nargs=-1, @@ -125,6 +126,10 @@ def del_pid_file(): scheduler_file=scheduler_file, security=sec) scheduler.start(addr) + if not preload: + preload = dask.config.get('distributed.scheduler.preloads') + if not preload_argv: + preload_argv = dask.config.get('distributed.scheduler.preloads-argv') preload_modules(preload, parameter=scheduler, file_dir=local_directory, argv=preload_argv) logger.info('Local Directory: %26s', local_directory) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 31bd73e9663..cbf4d496005 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -15,6 +15,8 @@ distributed: transition-log-length: 100000 work-stealing: True # workers should steal tasks from each other worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this + preloads: [] + preloads-argv: [] worker: multiprocessing-method: forkserver @@ -22,6 +24,8 @@ distributed: connections: # Maximum concurrent connections for data outgoing: 50 # This helps to control network saturation incoming: 10 + preloads: [] + preloads-argv: [] profile: interval: 10ms # Time between statistical profiling queries diff --git a/distributed/worker.py b/distributed/worker.py index 43b3ee13ed2..a2d4524295d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -87,7 +87,7 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, services=None, service_ports=None, name=None, reconnect=True, memory_limit='auto', executor=None, resources=None, silence_logs=None, - death_timeout=None, preload=(), preload_argv=[], security=None, + death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, memory_monitor_interval='200ms', extensions=None, metrics=None, **kwargs): @@ -108,7 +108,11 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.available_resources = (resources or {}).copy() self.death_timeout = death_timeout self.preload = preload - self.preload_argv = preload_argv, + if self.preload is None: + self.preload = dask.config.get('distributed.worker.preloads') + self.preload_argv = preload_argv + if self.preload_argv is None: + self.preload_argv = dask.config.get('distributed.worker.preloads-argv') self.contact_address = contact_address self.memory_monitor_interval = parse_timedelta(memory_monitor_interval, default='ms') self.extensions = dict() From 310c02aeabfab8ab81587888d75a458cdd367fe6 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 3 Nov 2018 15:13:00 +0100 Subject: [PATCH 2/2] preload without s --- distributed/cli/dask_scheduler.py | 4 ++-- distributed/distributed.yaml | 8 ++++---- distributed/worker.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 702ea601dee..9e01ad93712 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -127,9 +127,9 @@ def del_pid_file(): security=sec) scheduler.start(addr) if not preload: - preload = dask.config.get('distributed.scheduler.preloads') + preload = dask.config.get('distributed.scheduler.preload') if not preload_argv: - preload_argv = dask.config.get('distributed.scheduler.preloads-argv') + preload_argv = dask.config.get('distributed.scheduler.preload-argv') preload_modules(preload, parameter=scheduler, file_dir=local_directory, argv=preload_argv) logger.info('Local Directory: %26s', local_directory) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index cbf4d496005..b75bbd650ae 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -15,8 +15,8 @@ distributed: transition-log-length: 100000 work-stealing: True # workers should steal tasks from each other worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this - preloads: [] - preloads-argv: [] + preload: [] + preload-argv: [] worker: multiprocessing-method: forkserver @@ -24,8 +24,8 @@ distributed: connections: # Maximum concurrent connections for data outgoing: 50 # This helps to control network saturation incoming: 10 - preloads: [] - preloads-argv: [] + preload: [] + preload-argv: [] profile: interval: 10ms # Time between statistical profiling queries diff --git a/distributed/worker.py b/distributed/worker.py index a2d4524295d..5709501d180 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -109,10 +109,10 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.death_timeout = death_timeout self.preload = preload if self.preload is None: - self.preload = dask.config.get('distributed.worker.preloads') + self.preload = dask.config.get('distributed.worker.preload') self.preload_argv = preload_argv if self.preload_argv is None: - self.preload_argv = dask.config.get('distributed.worker.preloads-argv') + self.preload_argv = dask.config.get('distributed.worker.preload-argv') self.contact_address = contact_address self.memory_monitor_interval = parse_timedelta(memory_monitor_interval, default='ms') self.extensions = dict()