Skip to content
Merged
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# flake8: noqa
from . import config
from .cluster_manager import ClusterManager
from .core import JobQueueCluster
from .moab import MoabCluster
from .pbs import PBSCluster
Expand Down
114 changes: 114 additions & 0 deletions dask_jobqueue/cluster_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging

from tornado import gen
from distributed.deploy import Cluster
from distributed.utils import log_errors

logger = logging.getLogger(__name__)


class ClusterManager(Cluster):
""" Intermediate Cluster object that should lead to a real ClusterManager

This tries to improve upstream Cluster object and underlines needs for
better decoupling between ClusterManager and Scheduler object

This currently expects a local Scheduler defined on the object, but should
eventually only rely on RPC calls on remote or local scheduler.
It provides common methods and an IPython widget display.

Clusters inheriting from this class should provide the following:

1. A local ``Scheduler`` object at ``.scheduler``. In the future, just
a URL to local or remote scheduler.
2. scale_up and scale_down methods as defined below::

def scale_up(self, n: int):
''' Brings total worker count up to ``n`` '''

def scale_down(self, workers: List[str], n: int):
''' Close the workers with the given addresses or remove pending
workers to match n running workers.
'''

This will provide a general ``scale`` method as well as an IPython widget
for display.

Things the will need to change for the complete Cluster Manager Design:
- ClusterManager:
- Use it's own event loop, or the notebook one.
- Connect to a local or remote Scheduler through RPC, and then
communicate with it.
- Ability to start a local or remote scheduler.
- Scheduler
- Provide some remote methods:
- retire_workers(n: int): close enough workers ot have only n
running at the end. Return the closed workers.

Examples
--------

>>> from distributed.deploy import Cluster
>>> class MyCluster(cluster):
... def scale_up(self, n):
... ''' Bring the total worker count up to n '''
... pass
... def scale_down(self, workers, n=None):
... ''' Close the workers with the given addresses '''
... pass

>>> cluster = MyCluster()
>>> cluster.scale(5) # scale manually
>>> cluster.adapt(minimum=1, maximum=100) # scale automatically
"""

def __init__(self):
self._target_scale = 0

@gen.coroutine
def _scale(self, n):
""" Asynchronously called scale method

This allows to do every operation with a coherent ocntext
"""
with log_errors():
# here we rely on a ClusterManager attribute to retrieve the
# active and pending workers
if n == self._target_scale:
pass
elif n > self._target_scale:
self.scale_up(n)
else:
# TODO to_close may be empty if some workers are pending
# This may not be useful to call scheduler methods in this case
# Scheduler interface here may need to be modified
to_close = self.scheduler.workers_to_close(
n=len(self.scheduler.workers) - n)
logger.debug("Closing workers: %s", to_close)
# Should be an RPC call here
yield self.scheduler.retire_workers(workers=to_close)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a warning, now that you've added a yield in this coroutine it's entirely possible for another coroutine to start running while this one waits for a response. It is entirely possible that two _scale coroutines will be active at the same time.

You still can't use a threading.Lock to fix this (threading locks will lock the entire event loop). You can use a Tornado lock, or a few other methods. Short term I wouldn't worry about it though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the precision. I don't think this is an issue yet.

# To close may be empty if just asking to remove pending
# workers, so we should also give a target number
self.scale_down(to_close, n)
self._target_scale = n

def scale(self, n):
""" Scale cluster to n workers

Parameters
----------
n: int
Target number of workers

Example
-------
>>> cluster.scale(10) # scale cluster to ten workers

See Also
--------
Cluster.scale_up
Cluster.scale_down
"""
# TODO we should not rely on scheduler loop here, self should have its
# own loop
self.scheduler.loop.add_callback(self._scale, n)
66 changes: 37 additions & 29 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import dask
import docrep
from . import ClusterManager
from distributed import LocalCluster
from distributed.deploy import Cluster
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface

Expand Down Expand Up @@ -92,7 +92,7 @@ def remove_worker(self, scheduler=None, worker=None, **kwargs):


@docstrings.get_sectionsf('JobQueueCluster')
class JobQueueCluster(Cluster):
class JobQueueCluster(ClusterManager):
""" Base class to launch Dask Clusters for Job queues

This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster
Expand Down Expand Up @@ -179,6 +179,8 @@ def __init__(self,
# """
# This initializer should be considered as Abstract, and never used directly.
# """
super(JobQueueCluster, self).__init__()

if threads is not None:
raise ValueError(threads_deprecation_message)

Expand Down Expand Up @@ -263,8 +265,6 @@ def __init__(self,

self._command_template = ' '.join(map(str, command_args))

self._target_scale = 0

self.log_directory = log_directory
if self.log_directory is not None:
if not os.path.exists(self.log_directory):
Expand Down Expand Up @@ -413,21 +413,12 @@ def scale_up(self, n, **kwargs):
active_and_pending = self._count_active_and_pending_workers()
if n >= active_and_pending:
logger.debug("Scaling up to %d workers.", n)
self.start_workers(n - self._count_active_and_pending_workers())
self.start_workers(n - active_and_pending)
else:
n_to_close = active_and_pending - n
if n_to_close < self._count_pending_workers():
# We only need to kill some pending jobs, this is actually a
# scale down bu could not be handled upstream
to_kill = int(n_to_close / self.worker_processes)
jobs = list(self.pending_jobs.keys())[to_kill:]
self.stop_jobs(jobs)
else:
# We should not end here, a new scale call should not begin
# until a scale_up or scale_down has ended
raise RuntimeError('JobQueueCluster.scale_up was called with'
' a number of worker lower than the '
'currently connected workers')
# scale_up should not be called if n < active + pending jobs
logger.warning('JobQueueCluster.scale_up was called with a'
' number of workers lower that what is already'
' running or pending')

def _count_active_and_pending_workers(self):
active_and_pending = (self._count_active_workers() +
Expand All @@ -444,17 +435,34 @@ def _count_active_workers(self):
def _count_pending_workers(self):
return self.worker_processes * len(self.pending_jobs)

def scale_down(self, workers):
def scale_down(self, workers, n=None):
''' Close the workers with the given addresses '''
logger.debug("Scaling down. Workers: %s", workers)
worker_states = []
for w in workers:
try:
# Get the actual WorkerState
worker_states.append(self.scheduler.workers[w])
except KeyError:
logger.debug('worker %s is already gone', w)
self.stop_workers(worker_states)
if n is None:
# Adaptive currently calls directly scale_down, we need to handle this
# Need to only keep active workers minus those adaptive wants to stop
n = self._count_active_workers() - len(workers)
logger.debug("Scaling down to %d Workers: %s", n, workers)
active_and_pending = self._count_active_and_pending_workers()
n_to_close = active_and_pending - n
if n_to_close < 0:
logger.warning('JobQueueCluster.scale_down was called with'
' a number of worker greater than what is'
' already running or pending.')
elif n_to_close <= self._count_pending_workers():
# We only need to kill some pending jobs,
to_kill = int(n_to_close / self.worker_processes)
jobs = list(self.pending_jobs.keys())[-to_kill:]
logger.debug("%d jobs to stop, stoppubg jobs %s", to_kill, jobs)
self.stop_jobs(jobs)
else:
worker_states = []
for w in workers:
try:
# Get the actual WorkerState
worker_states.append(self.scheduler.workers[w])
except KeyError:
logger.debug('worker %s is already gone', w)
self.stop_workers(worker_states)

def stop_all_jobs(self):
''' Stops all running and pending jobs '''
Expand Down Expand Up @@ -492,7 +500,7 @@ def _job_id_from_submit_output(self, out):
job_id = match.groupdict().get('job_id')
if job_id is None:
msg = ("You need to use a 'job_id' named group in your regexp, e.g. "
"r'(?P<job_id>\d+)', in your regexp. Your regexp was: "
"r'(?P<job_id>\\d+)', in your regexp. Your regexp was: "
"{!r}".format(self.job_id_regexp))
raise ValueError(msg)

Expand Down
5 changes: 4 additions & 1 deletion dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def test_basic(loop):


@pytest.mark.env("pbs") # noqa: F811
@pytest.mark.skip(reason="Current scale method not capable of doing this")
def test_basic_scale_edge_cases(loop):
with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
job_extra=['-V'], loop=loop) as cluster:
Expand All @@ -129,6 +128,10 @@ def test_basic_scale_edge_cases(loop):

# Wait to see what happens
sleep(0.2)
start = time()
while cluster.pending_jobs or cluster.running_jobs:
sleep(0.1)
assert time() < start + QUEUE_WAIT

assert not(cluster.pending_jobs or cluster.running_jobs)

Expand Down