From 6750f0106447f368fa0d2c32b860cc66783bccd3 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 6 Oct 2018 22:55:33 +0200 Subject: [PATCH 1/9] Fix scale edge cases --- dask_jobqueue/__init__.py | 1 + dask_jobqueue/cluster_manager.py | 55 ++++++++++++++++++++++++++++++++ dask_jobqueue/core.py | 4 +-- 3 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 dask_jobqueue/cluster_manager.py diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index b9aff684..711521f9 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -1,5 +1,6 @@ # flake8: noqa from . import config +from .cluster_manager import ClusterManager from .core import JobQueueCluster from .moab import MoabCluster from .pbs import PBSCluster diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py new file mode 100644 index 00000000..0f1422c4 --- /dev/null +++ b/dask_jobqueue/cluster_manager.py @@ -0,0 +1,55 @@ +import logging + +from distributed.deploy import Cluster + +from distributed.utils import log_errors + +logger = logging.getLogger(__name__) + + +class ClusterManager(Cluster): + """ Intermediate Cluster object that should lead to a real ClusterManager + + This tries to improve upstream Cluster object and underlines needs for + better decoupling between ClusterManager and Scheduler object + """ + + def _scale(self, n): + """ Asynchronously called scale method + + This allows to do every operation with a coherent ocntext + """ + with log_errors(): + # TODO here we should rely on a ClusterManager func to retrieve the + # active and pending workers + if n >= len(self.scheduler.workers): + self.scale_up(n) + else: + to_close = self.scheduler.workers_to_close( + n=len(self.scheduler.workers) - n) + logger.debug("Closing workers: %s", to_close) + self.scheduler.retire_workers(workers=to_close) + # TODO To close may be empty if just asking to remove pending + # workers, so we should also give a target number + self.scale_down(to_close) + + def scale(self, n): + """ Scale cluster to n workers + + Parameters + ---------- + n: int + Target number of workers + + Example + ------- + >>> cluster.scale(10) # scale cluster to ten workers + + See Also + -------- + Cluster.scale_up + Cluster.scale_down + """ + # TODO we should not rely on scheduler loop here, self should have its + # own loop + self.scheduler.loop.add_callback(self._scale(n)) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 6d587027..278caa3b 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -12,8 +12,8 @@ import dask import docrep +from . import ClusterManager from distributed import LocalCluster -from distributed.deploy import Cluster from distributed.diagnostics.plugin import SchedulerPlugin from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface @@ -88,7 +88,7 @@ def remove_worker(self, scheduler=None, worker=None, **kwargs): @docstrings.get_sectionsf('JobQueueCluster') -class JobQueueCluster(Cluster): +class JobQueueCluster(ClusterManager): """ Base class to launch Dask Clusters for Job queues This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster From 5e834c28f0d2d9d2437a4bb6d41689f61e50f541 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sun, 7 Oct 2018 10:12:47 +0200 Subject: [PATCH 2/9] enabling scale edge case test --- dask_jobqueue/tests/test_pbs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index db016f5f..8c97b4af 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -119,7 +119,6 @@ def test_basic(loop): @pytest.mark.env("pbs") # noqa: F811 -@pytest.mark.skip(reason="Current scale method not capable of doing this") def test_basic_scale_edge_cases(loop): with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', job_extra=['-V'], loop=loop) as cluster: From 189b263e0cffa05e54a413125d03573160d24f4b Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sun, 7 Oct 2018 10:35:04 +0200 Subject: [PATCH 3/9] use add_callback correctly --- dask_jobqueue/cluster_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 0f1422c4..d0eaa8c5 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -52,4 +52,4 @@ def scale(self, n): """ # TODO we should not rely on scheduler loop here, self should have its # own loop - self.scheduler.loop.add_callback(self._scale(n)) + self.scheduler.loop.add_callback(self._scale, n) From e8b2fb18e91cd34f0c4b0fb0ed8aeab2ee5c0cfd Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Thu, 11 Oct 2018 14:07:29 +0000 Subject: [PATCH 4/9] Fix concurency issue and other code pieces --- dask_jobqueue/cluster_manager.py | 18 +++++++--- dask_jobqueue/core.py | 56 +++++++++++++++++--------------- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index d0eaa8c5..0fbec3c5 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -1,4 +1,5 @@ import logging +import threading from distributed.deploy import Cluster @@ -14,24 +15,31 @@ class ClusterManager(Cluster): better decoupling between ClusterManager and Scheduler object """ + def __init__(self): + self._target_scale = 0 + self._lock = threading.Lock() + def _scale(self, n): """ Asynchronously called scale method This allows to do every operation with a coherent ocntext """ - with log_errors(): - # TODO here we should rely on a ClusterManager func to retrieve the + with log_errors(), self._lock: + # here we rely on a ClusterManager attribute to retrieve the # active and pending workers - if n >= len(self.scheduler.workers): + if n == self._target_scale: + pass + elif n > self._target_scale: self.scale_up(n) else: to_close = self.scheduler.workers_to_close( n=len(self.scheduler.workers) - n) logger.debug("Closing workers: %s", to_close) self.scheduler.retire_workers(workers=to_close) - # TODO To close may be empty if just asking to remove pending + # To close may be empty if just asking to remove pending # workers, so we should also give a target number - self.scale_down(to_close) + self.scale_down(n, to_close) + self._target_scale = n def scale(self, n): """ Scale cluster to n workers diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 6816d5c5..359e737e 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -175,6 +175,8 @@ def __init__(self, # """ # This initializer should be considered as Abstract, and never used directly. # """ + super(JobQueueCluster, self).__init__() + if threads is not None: raise ValueError(threads_deprecation_message) @@ -257,8 +259,6 @@ def __init__(self, self._command_template = ' '.join(map(str, command_args)) - self._target_scale = 0 - def __repr__(self): running_workers = self._count_active_workers() running_cores = running_workers * self.worker_threads @@ -396,21 +396,12 @@ def scale_up(self, n, **kwargs): active_and_pending = self._count_active_and_pending_workers() if n >= active_and_pending: logger.debug("Scaling up to %d workers.", n) - self.start_workers(n - self._count_active_and_pending_workers()) + self.start_workers(n - active_and_pending) else: - n_to_close = active_and_pending - n - if n_to_close < self._count_pending_workers(): - # We only need to kill some pending jobs, this is actually a - # scale down bu could not be handled upstream - to_kill = int(n_to_close / self.worker_processes) - jobs = list(self.pending_jobs.keys())[to_kill:] - self.stop_jobs(jobs) - else: - # We should not end here, a new scale call should not begin - # until a scale_up or scale_down has ended - raise RuntimeError('JobQueueCluster.scale_up was called with' - ' a number of worker lower than the ' - 'currently connected workers') + # scale_up should not be called if n < active + pending jobs + raise RuntimeError('JobQueueCluster.scale_up was called with a' + ' number of workers lower that what is already' + ' running or pending') def _count_active_and_pending_workers(self): active_and_pending = (self._count_active_workers() + @@ -427,17 +418,30 @@ def _count_active_workers(self): def _count_pending_workers(self): return self.worker_processes * len(self.pending_jobs) - def scale_down(self, workers): + def scale_down(self, n, workers): ''' Close the workers with the given addresses ''' - logger.debug("Scaling down. Workers: %s", workers) - worker_states = [] - for w in workers: - try: - # Get the actual WorkerState - worker_states.append(self.scheduler.workers[w]) - except KeyError: - logger.debug('worker %s is already gone', w) - self.stop_workers(worker_states) + logger.debug("Scaling down to %d Workers: %s", n, workers) + active_and_pending = self._count_active_and_pending_workers() + n_to_close = active_and_pending - n + if n_to_close < 0: + raise RuntimeError('JobQueueCluster.scale_down was called with' + ' a number of worker greater than what is' + ' already running or pending.') + elif n_to_close <= self._count_pending_workers(): + # We only need to kill some pending jobs, + to_kill = int(n_to_close / self.worker_processes) + jobs = list(self.pending_jobs.keys())[-to_kill:] + logger.debug("%d jobs to stop, stoppubg jobs %s", to_kill, jobs) + self.stop_jobs(jobs) + else: + worker_states = [] + for w in workers: + try: + # Get the actual WorkerState + worker_states.append(self.scheduler.workers[w]) + except KeyError: + logger.debug('worker %s is already gone', w) + self.stop_workers(worker_states) def stop_all_jobs(self): ''' Stops all running and pending jobs ''' From 0be29bef859b9fa497916e258aec37e0e81d52e2 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Fri, 12 Oct 2018 09:38:51 +0000 Subject: [PATCH 5/9] wait some time for jobs to be cancelled --- dask_jobqueue/tests/test_pbs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 8c97b4af..5ffb1c1d 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -128,6 +128,10 @@ def test_basic_scale_edge_cases(loop): # Wait to see what happens sleep(0.2) + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.1) + assert time() < start + QUEUE_WAIT assert not(cluster.pending_jobs or cluster.running_jobs) From 381dcdcd2be140a24322a5bc21b5a152b3e37da0 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Fri, 12 Oct 2018 23:54:05 +0200 Subject: [PATCH 6/9] using coroutine to yeild scheduler calls. Removing lock --- dask_jobqueue/cluster_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 0fbec3c5..cdf9a9a3 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -1,8 +1,7 @@ import logging -import threading +from tornado import gen from distributed.deploy import Cluster - from distributed.utils import log_errors logger = logging.getLogger(__name__) @@ -17,14 +16,14 @@ class ClusterManager(Cluster): def __init__(self): self._target_scale = 0 - self._lock = threading.Lock() + @gen.coroutine def _scale(self, n): """ Asynchronously called scale method This allows to do every operation with a coherent ocntext """ - with log_errors(), self._lock: + with log_errors(): # here we rely on a ClusterManager attribute to retrieve the # active and pending workers if n == self._target_scale: @@ -35,7 +34,8 @@ def _scale(self, n): to_close = self.scheduler.workers_to_close( n=len(self.scheduler.workers) - n) logger.debug("Closing workers: %s", to_close) - self.scheduler.retire_workers(workers=to_close) + # Should be an RPC call here + yield self.scheduler.retire_workers(workers=to_close) # To close may be empty if just asking to remove pending # workers, so we should also give a target number self.scale_down(n, to_close) From b8c7836220a8c352f9374bfe7fb480df124d4aff Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Sat, 20 Oct 2018 20:56:38 +0000 Subject: [PATCH 7/9] some more adaptive and scale tests --- dask_jobqueue/cluster_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index cdf9a9a3..23d382f3 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -31,6 +31,9 @@ def _scale(self, n): elif n > self._target_scale: self.scale_up(n) else: + # TODO to_close may be empty if some workers are pending + # This may not be useful to call scheduler methods in this case + # Scheduler interface here may need to be modified to_close = self.scheduler.workers_to_close( n=len(self.scheduler.workers) - n) logger.debug("Closing workers: %s", to_close) @@ -38,7 +41,7 @@ def _scale(self, n): yield self.scheduler.retire_workers(workers=to_close) # To close may be empty if just asking to remove pending # workers, so we should also give a target number - self.scale_down(n, to_close) + self.scale_down(to_close, n) self._target_scale = n def scale(self, n): From 0a2d2ce50f0ae596d713aff5f8deefebea2b0834 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Mon, 29 Oct 2018 10:54:40 +0100 Subject: [PATCH 8/9] Comments on cluster Manager desgin need --- dask_jobqueue/cluster_manager.py | 48 ++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 23d382f3..15e82bac 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -12,6 +12,54 @@ class ClusterManager(Cluster): This tries to improve upstream Cluster object and underlines needs for better decoupling between ClusterManager and Scheduler object + + This currently expects a local Scheduler defined on the object, but should + eventually only rely on RPC calls on remote or local scheduler. + It provides common methods and an IPython widget display. + + Clusters inheriting from this class should provide the following: + + 1. A local ``Scheduler`` object at ``.scheduler``. In the future, just + a URL to local or remote scheduler. + 2. scale_up and scale_down methods as defined below:: + + def scale_up(self, n: int): + ''' Brings total worker count up to ``n`` ''' + + def scale_down(self, workers: List[str], n: int): + ''' Close the workers with the given addresses or remove pending + workers to match n running workers. + ''' + + This will provide a general ``scale`` method as well as an IPython widget + for display. + + Things the will need to change for the complete Cluster Manager Design: + - ClusterManager: + - Use it's own event loop, or the notebook one. + - Connect to a local or remote Scheduler through RPC, and then + communicate with it. + - Ability to start a local or remote scheduler. + - Scheduler + - Provide some remote methods: + - retire_workers(n: int): close enough workers ot have only n + running at the end. Return the closed workers. + + Examples + -------- + + >>> from distributed.deploy import Cluster + >>> class MyCluster(cluster): + ... def scale_up(self, n): + ... ''' Bring the total worker count up to n ''' + ... pass + ... def scale_down(self, workers, n=None): + ... ''' Close the workers with the given addresses ''' + ... pass + + >>> cluster = MyCluster() + >>> cluster.scale(5) # scale manually + >>> cluster.adapt(minimum=1, maximum=100) # scale automatically """ def __init__(self): From 383854e2f675dfa6bcb291d0f7b15395f9b01032 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Mon, 29 Oct 2018 11:29:59 +0100 Subject: [PATCH 9/9] new flake check --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 169ee94b..b14be675 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -500,7 +500,7 @@ def _job_id_from_submit_output(self, out): job_id = match.groupdict().get('job_id') if job_id is None: msg = ("You need to use a 'job_id' named group in your regexp, e.g. " - "r'(?P\d+)', in your regexp. Your regexp was: " + "r'(?P\\d+)', in your regexp. Your regexp was: " "{!r}".format(self.job_id_regexp)) raise ValueError(msg)