Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 125 additions & 16 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
import math
from weakref import ref

import dask

from .adaptive import Adaptive

from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring
from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring, \
parse_bytes

logger = logging.getLogger(__name__)

Expand All @@ -27,6 +29,10 @@ def scale_up(self, n: int):

def scale_down(self, workers: List[str]):
''' Close the workers with the given addresses '''
3. worker_info dict attribute if scale(cores=...) or scale(memory=...)
can be used by users.

cluster.worker_info = {'cores':4 , 'memory':'16 GB'}

This will provide a general ``scale`` method as well as an IPython widget
for display.
Expand All @@ -42,28 +48,62 @@ def scale_down(self, workers: List[str]):
... def scale_down(self, workers):
... ''' Close the workers with the given addresses '''
... pass
... worker_info = {'cores': 4, 'memory': '16 GB'}

>>> cluster = MyCluster()
>>> cluster.scale(5) # scale manually
>>> cluster.adapt(minimum=1, maximum=100) # scale automatically
>>> cluster.scale(cores=100) # scale manually to cores nb

See Also
--------
LocalCluster: a simple implementation with local workers
"""
def adapt(self, **kwargs):
def adapt(self, minimum_cores=None, maximum_cores=None,
minimum_memory=None, maximum_memory=None, **kwargs):
""" Turn on adaptivity

For keyword arguments see dask.distributed.Adaptive

Instead of minimum and maximum parameters which apply to the number of
worker, If Cluster object implements worker_info attribute, one can use
the following parameters:

Parameters
----------
minimum_cores: int
Minimum number of cores for the cluster
maximum_cores: int
Maximum number of cores for the cluster
minimum_memory: str
Minimum amount of memory for the cluster
maximum_memory: str
Maximum amount of memory for the cluster


Examples
--------
>>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
>>> cluster.adapt(minimum_cores=24, maximum_cores=96)
>>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB')
"""
with ignoring(AttributeError):
self._adaptive.stop()
if not hasattr(self, '_adaptive_options'):
self._adaptive_options = {}

if 'minimum' not in kwargs:
if minimum_cores is not None:
kwargs['minimum'] = self._get_nb_workers_from_cores(minimum_cores)
elif minimum_memory is not None:
kwargs['minimum'] = self._get_nb_workers_from_memory(minimum_memory)

if 'maximum' not in kwargs:
if maximum_cores is not None:
kwargs['maximum'] = self._get_nb_workers_from_cores(maximum_cores)
elif maximum_memory is not None:
kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory)

self._adaptive_options.update(kwargs)
self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options)
return self._adaptive
Expand All @@ -79,24 +119,46 @@ def dashboard_link(self):
port = self.scheduler.services['bokeh'].port
return template.format(host=host, port=port, **os.environ)

def scale(self, n):
""" Scale cluster to n workers
def scale(self, n=None, cores=None, memory=None):
""" Scale cluster to n workers or to the given number of cores or
memory

number of cores and memory are converted into number of workers using
worker_info attribute.

Parameters
----------
n: int
Target number of workers
cores: int
Target number of cores
memory: str
Target amount of available memory

Example
-------
>>> cluster.scale(10) # scale cluster to ten workers
>>> cluster.scale(cores=100) # scale cluster to 100 cores
>>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory

See Also
--------
Cluster.scale_up
Cluster.scale_down
Cluster.worker_info
"""
with log_errors():
if [n, cores, memory].count(None) != 2:
raise ValueError('One and only one of n, cores, memory kwargs'
' should be used, n={}, cores={}, memory={}'
' provided.'.format(n, cores, memory))

if n is None:
if cores is not None:
n = self._get_nb_workers_from_cores(cores)
elif memory is not None:
n = self._get_nb_workers_from_memory(memory)

if n >= len(self.scheduler.workers):
self.scheduler.loop.add_callback(self.scale_up, n)
else:
Expand Down Expand Up @@ -142,7 +204,7 @@ def _widget(self):
except AttributeError:
pass

from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion
from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion, Text

layout = Layout(width='150px')

Expand All @@ -160,13 +222,31 @@ def _widget(self):

request = IntText(0, description='Workers', layout=layout)
scale = Button(description='Scale', layout=layout)
request_cores = IntText(0, description='Cores', layout=layout)
scale_cores = Button(description='Scale', layout=layout)
request_memory = Text('O GB', description='Memory', layout=layout)
scale_memory = Button(description='Scale', layout=layout)

minimum = IntText(0, description='Minimum', layout=layout)
maximum = IntText(0, description='Maximum', layout=layout)
adapt = Button(description='Adapt', layout=layout)

accordion = Accordion([HBox([request, scale]),
HBox([minimum, maximum, adapt])],
minimum_cores = IntText(0, description='Min cores', layout=layout)
maximum_cores = IntText(0, description='Max cores', layout=layout)
adapt_cores = Button(description='Adapt', layout=layout)
minimum_mem = Text('0 GB', description='Min memory', layout=layout)
maximum_mem = Text('0 GB', description='Max memory', layout=layout)
adapt_mem = Button(description='Adapt', layout=layout)

scale_hbox = [HBox([request, scale])]
adapt_hbox = [HBox([minimum, maximum, adapt])]
if hasattr(self, 'worker_info'):
scale_hbox.append(HBox([request_cores, scale_cores]))
scale_hbox.append(HBox([request_memory, scale_memory]))
adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores]))
adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem]))

accordion = Accordion([VBox(scale_hbox),
VBox(adapt_hbox)],
layout=Layout(min_width='500px'))
accordion.selected_index = None
accordion.set_title(0, 'Manual Scaling')
Expand All @@ -182,16 +262,30 @@ def _widget(self):
def adapt_cb(b):
self.adapt(minimum=minimum.value, maximum=maximum.value)

adapt.on_click(adapt_cb)
def adapt_cores_cb(b):
self.adapt(minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value)

def scale_cb(b):
with log_errors():
n = request.value
with ignoring(AttributeError):
self._adaptive.stop()
self.scale(n)
def adapt_mem_cb(b):
self.adapt(minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value)

scale.on_click(scale_cb)
adapt.on_click(adapt_cb)
adapt_cores.on_click(adapt_cores_cb)
adapt_mem.on_click(adapt_mem_cb)

def scale_cb(request, kwarg):
def request_cb(b):
with log_errors():
arg = request.value
with ignoring(AttributeError):
self._adaptive.stop()
local_kwargs = dict()
local_kwargs[kwarg] = arg
self.scale(**local_kwargs)
return request_cb

scale.on_click(scale_cb(request, 'n'))
scale_cores.on_click(scale_cb(request_cores, 'cores'))
scale_memory.on_click(scale_cb(request_memory, 'memory'))

scheduler_ref = ref(self.scheduler)

Expand All @@ -206,3 +300,18 @@ def update():

def _ipython_display_(self, **kwargs):
return self._widget()._ipython_display_(**kwargs)

def _get_nb_workers_from_cores(self, cores):
self._raise_exception_if_not_worker_info()
return math.ceil(cores / self.worker_info['cores'])

def _get_nb_workers_from_memory(self, memory):
self._raise_exception_if_not_worker_info()
return math.ceil(parse_bytes(memory) / parse_bytes(self.worker_info['memory']))

def _raise_exception_if_not_worker_info(self):
if not hasattr(self, 'worker_info'):
raise NotImplementedError('{} class does not provide worker_info '
'attribute, needed for scaling with '
'cores or memory kwargs.'
.format(self.__class__.__name__))
Loading