diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index b9aff684..711521f9 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -1,5 +1,6 @@ # flake8: noqa from . import config +from .cluster_manager import ClusterManager from .core import JobQueueCluster from .moab import MoabCluster from .pbs import PBSCluster diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py new file mode 100644 index 00000000..15e82bac --- /dev/null +++ b/dask_jobqueue/cluster_manager.py @@ -0,0 +1,114 @@ +import logging + +from tornado import gen +from distributed.deploy import Cluster +from distributed.utils import log_errors + +logger = logging.getLogger(__name__) + + +class ClusterManager(Cluster): + """ Intermediate Cluster object that should lead to a real ClusterManager + + This tries to improve upstream Cluster object and underlines needs for + better decoupling between ClusterManager and Scheduler object + + This currently expects a local Scheduler defined on the object, but should + eventually only rely on RPC calls on remote or local scheduler. + It provides common methods and an IPython widget display. + + Clusters inheriting from this class should provide the following: + + 1. A local ``Scheduler`` object at ``.scheduler``. In the future, just + a URL to local or remote scheduler. + 2. scale_up and scale_down methods as defined below:: + + def scale_up(self, n: int): + ''' Brings total worker count up to ``n`` ''' + + def scale_down(self, workers: List[str], n: int): + ''' Close the workers with the given addresses or remove pending + workers to match n running workers. + ''' + + This will provide a general ``scale`` method as well as an IPython widget + for display. + + Things the will need to change for the complete Cluster Manager Design: + - ClusterManager: + - Use it's own event loop, or the notebook one. + - Connect to a local or remote Scheduler through RPC, and then + communicate with it. + - Ability to start a local or remote scheduler. + - Scheduler + - Provide some remote methods: + - retire_workers(n: int): close enough workers ot have only n + running at the end. Return the closed workers. + + Examples + -------- + + >>> from distributed.deploy import Cluster + >>> class MyCluster(cluster): + ... def scale_up(self, n): + ... ''' Bring the total worker count up to n ''' + ... pass + ... def scale_down(self, workers, n=None): + ... ''' Close the workers with the given addresses ''' + ... pass + + >>> cluster = MyCluster() + >>> cluster.scale(5) # scale manually + >>> cluster.adapt(minimum=1, maximum=100) # scale automatically + """ + + def __init__(self): + self._target_scale = 0 + + @gen.coroutine + def _scale(self, n): + """ Asynchronously called scale method + + This allows to do every operation with a coherent ocntext + """ + with log_errors(): + # here we rely on a ClusterManager attribute to retrieve the + # active and pending workers + if n == self._target_scale: + pass + elif n > self._target_scale: + self.scale_up(n) + else: + # TODO to_close may be empty if some workers are pending + # This may not be useful to call scheduler methods in this case + # Scheduler interface here may need to be modified + to_close = self.scheduler.workers_to_close( + n=len(self.scheduler.workers) - n) + logger.debug("Closing workers: %s", to_close) + # Should be an RPC call here + yield self.scheduler.retire_workers(workers=to_close) + # To close may be empty if just asking to remove pending + # workers, so we should also give a target number + self.scale_down(to_close, n) + self._target_scale = n + + def scale(self, n): + """ Scale cluster to n workers + + Parameters + ---------- + n: int + Target number of workers + + Example + ------- + >>> cluster.scale(10) # scale cluster to ten workers + + See Also + -------- + Cluster.scale_up + Cluster.scale_down + """ + # TODO we should not rely on scheduler loop here, self should have its + # own loop + self.scheduler.loop.add_callback(self._scale, n) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 176bb2a3..b14be675 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -15,8 +15,8 @@ import dask import docrep +from . import ClusterManager from distributed import LocalCluster -from distributed.deploy import Cluster from distributed.diagnostics.plugin import SchedulerPlugin from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface @@ -92,7 +92,7 @@ def remove_worker(self, scheduler=None, worker=None, **kwargs): @docstrings.get_sectionsf('JobQueueCluster') -class JobQueueCluster(Cluster): +class JobQueueCluster(ClusterManager): """ Base class to launch Dask Clusters for Job queues This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster @@ -179,6 +179,8 @@ def __init__(self, # """ # This initializer should be considered as Abstract, and never used directly. # """ + super(JobQueueCluster, self).__init__() + if threads is not None: raise ValueError(threads_deprecation_message) @@ -263,8 +265,6 @@ def __init__(self, self._command_template = ' '.join(map(str, command_args)) - self._target_scale = 0 - self.log_directory = log_directory if self.log_directory is not None: if not os.path.exists(self.log_directory): @@ -413,21 +413,12 @@ def scale_up(self, n, **kwargs): active_and_pending = self._count_active_and_pending_workers() if n >= active_and_pending: logger.debug("Scaling up to %d workers.", n) - self.start_workers(n - self._count_active_and_pending_workers()) + self.start_workers(n - active_and_pending) else: - n_to_close = active_and_pending - n - if n_to_close < self._count_pending_workers(): - # We only need to kill some pending jobs, this is actually a - # scale down bu could not be handled upstream - to_kill = int(n_to_close / self.worker_processes) - jobs = list(self.pending_jobs.keys())[to_kill:] - self.stop_jobs(jobs) - else: - # We should not end here, a new scale call should not begin - # until a scale_up or scale_down has ended - raise RuntimeError('JobQueueCluster.scale_up was called with' - ' a number of worker lower than the ' - 'currently connected workers') + # scale_up should not be called if n < active + pending jobs + logger.warning('JobQueueCluster.scale_up was called with a' + ' number of workers lower that what is already' + ' running or pending') def _count_active_and_pending_workers(self): active_and_pending = (self._count_active_workers() + @@ -444,17 +435,34 @@ def _count_active_workers(self): def _count_pending_workers(self): return self.worker_processes * len(self.pending_jobs) - def scale_down(self, workers): + def scale_down(self, workers, n=None): ''' Close the workers with the given addresses ''' - logger.debug("Scaling down. Workers: %s", workers) - worker_states = [] - for w in workers: - try: - # Get the actual WorkerState - worker_states.append(self.scheduler.workers[w]) - except KeyError: - logger.debug('worker %s is already gone', w) - self.stop_workers(worker_states) + if n is None: + # Adaptive currently calls directly scale_down, we need to handle this + # Need to only keep active workers minus those adaptive wants to stop + n = self._count_active_workers() - len(workers) + logger.debug("Scaling down to %d Workers: %s", n, workers) + active_and_pending = self._count_active_and_pending_workers() + n_to_close = active_and_pending - n + if n_to_close < 0: + logger.warning('JobQueueCluster.scale_down was called with' + ' a number of worker greater than what is' + ' already running or pending.') + elif n_to_close <= self._count_pending_workers(): + # We only need to kill some pending jobs, + to_kill = int(n_to_close / self.worker_processes) + jobs = list(self.pending_jobs.keys())[-to_kill:] + logger.debug("%d jobs to stop, stoppubg jobs %s", to_kill, jobs) + self.stop_jobs(jobs) + else: + worker_states = [] + for w in workers: + try: + # Get the actual WorkerState + worker_states.append(self.scheduler.workers[w]) + except KeyError: + logger.debug('worker %s is already gone', w) + self.stop_workers(worker_states) def stop_all_jobs(self): ''' Stops all running and pending jobs ''' @@ -492,7 +500,7 @@ def _job_id_from_submit_output(self, out): job_id = match.groupdict().get('job_id') if job_id is None: msg = ("You need to use a 'job_id' named group in your regexp, e.g. " - "r'(?P\d+)', in your regexp. Your regexp was: " + "r'(?P\\d+)', in your regexp. Your regexp was: " "{!r}".format(self.job_id_regexp)) raise ValueError(msg) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index db016f5f..5ffb1c1d 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -119,7 +119,6 @@ def test_basic(loop): @pytest.mark.env("pbs") # noqa: F811 -@pytest.mark.skip(reason="Current scale method not capable of doing this") def test_basic_scale_edge_cases(loop): with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', job_extra=['-V'], loop=loop) as cluster: @@ -129,6 +128,10 @@ def test_basic_scale_edge_cases(loop): # Wait to see what happens sleep(0.2) + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.1) + assert time() < start + QUEUE_WAIT assert not(cluster.pending_jobs or cluster.running_jobs)