diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 3bc2b2d9124..21f5644ba02 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -1,12 +1,14 @@ import logging import os +import math from weakref import ref import dask from .adaptive import Adaptive -from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring +from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring, \ + parse_bytes logger = logging.getLogger(__name__) @@ -27,6 +29,10 @@ def scale_up(self, n: int): def scale_down(self, workers: List[str]): ''' Close the workers with the given addresses ''' + 3. worker_info dict attribute if scale(cores=...) or scale(memory=...) + can be used by users. + + cluster.worker_info = {'cores':4 , 'memory':'16 GB'} This will provide a general ``scale`` method as well as an IPython widget for display. @@ -42,28 +48,62 @@ def scale_down(self, workers: List[str]): ... def scale_down(self, workers): ... ''' Close the workers with the given addresses ''' ... pass + ... worker_info = {'cores': 4, 'memory': '16 GB'} >>> cluster = MyCluster() >>> cluster.scale(5) # scale manually >>> cluster.adapt(minimum=1, maximum=100) # scale automatically + >>> cluster.scale(cores=100) # scale manually to cores nb See Also -------- LocalCluster: a simple implementation with local workers """ - def adapt(self, **kwargs): + def adapt(self, minimum_cores=None, maximum_cores=None, + minimum_memory=None, maximum_memory=None, **kwargs): """ Turn on adaptivity For keyword arguments see dask.distributed.Adaptive + Instead of minimum and maximum parameters which apply to the number of + worker, If Cluster object implements worker_info attribute, one can use + the following parameters: + + Parameters + ---------- + minimum_cores: int + Minimum number of cores for the cluster + maximum_cores: int + Maximum number of cores for the cluster + minimum_memory: str + Minimum amount of memory for the cluster + maximum_memory: str + Maximum amount of memory for the cluster + + Examples -------- >>> cluster.adapt(minimum=0, maximum=10, interval='500ms') + >>> cluster.adapt(minimum_cores=24, maximum_cores=96) + >>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB') """ with ignoring(AttributeError): self._adaptive.stop() if not hasattr(self, '_adaptive_options'): self._adaptive_options = {} + + if 'minimum' not in kwargs: + if minimum_cores is not None: + kwargs['minimum'] = self._get_nb_workers_from_cores(minimum_cores) + elif minimum_memory is not None: + kwargs['minimum'] = self._get_nb_workers_from_memory(minimum_memory) + + if 'maximum' not in kwargs: + if maximum_cores is not None: + kwargs['maximum'] = self._get_nb_workers_from_cores(maximum_cores) + elif maximum_memory is not None: + kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory) + self._adaptive_options.update(kwargs) self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options) return self._adaptive @@ -79,24 +119,46 @@ def dashboard_link(self): port = self.scheduler.services['bokeh'].port return template.format(host=host, port=port, **os.environ) - def scale(self, n): - """ Scale cluster to n workers + def scale(self, n=None, cores=None, memory=None): + """ Scale cluster to n workers or to the given number of cores or + memory + + number of cores and memory are converted into number of workers using + worker_info attribute. Parameters ---------- n: int Target number of workers + cores: int + Target number of cores + memory: str + Target amount of available memory Example ------- >>> cluster.scale(10) # scale cluster to ten workers + >>> cluster.scale(cores=100) # scale cluster to 100 cores + >>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory See Also -------- Cluster.scale_up Cluster.scale_down + Cluster.worker_info """ with log_errors(): + if [n, cores, memory].count(None) != 2: + raise ValueError('One and only one of n, cores, memory kwargs' + ' should be used, n={}, cores={}, memory={}' + ' provided.'.format(n, cores, memory)) + + if n is None: + if cores is not None: + n = self._get_nb_workers_from_cores(cores) + elif memory is not None: + n = self._get_nb_workers_from_memory(memory) + if n >= len(self.scheduler.workers): self.scheduler.loop.add_callback(self.scale_up, n) else: @@ -142,7 +204,7 @@ def _widget(self): except AttributeError: pass - from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion + from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion, Text layout = Layout(width='150px') @@ -160,13 +222,31 @@ def _widget(self): request = IntText(0, description='Workers', layout=layout) scale = Button(description='Scale', layout=layout) + request_cores = IntText(0, description='Cores', layout=layout) + scale_cores = Button(description='Scale', layout=layout) + request_memory = Text('O GB', description='Memory', layout=layout) + scale_memory = Button(description='Scale', layout=layout) minimum = IntText(0, description='Minimum', layout=layout) maximum = IntText(0, description='Maximum', layout=layout) adapt = Button(description='Adapt', layout=layout) - - accordion = Accordion([HBox([request, scale]), - HBox([minimum, maximum, adapt])], + minimum_cores = IntText(0, description='Min cores', layout=layout) + maximum_cores = IntText(0, description='Max cores', layout=layout) + adapt_cores = Button(description='Adapt', layout=layout) + minimum_mem = Text('0 GB', description='Min memory', layout=layout) + maximum_mem = Text('0 GB', description='Max memory', layout=layout) + adapt_mem = Button(description='Adapt', layout=layout) + + scale_hbox = [HBox([request, scale])] + adapt_hbox = [HBox([minimum, maximum, adapt])] + if hasattr(self, 'worker_info'): + scale_hbox.append(HBox([request_cores, scale_cores])) + scale_hbox.append(HBox([request_memory, scale_memory])) + adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores])) + adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem])) + + accordion = Accordion([VBox(scale_hbox), + VBox(adapt_hbox)], layout=Layout(min_width='500px')) accordion.selected_index = None accordion.set_title(0, 'Manual Scaling') @@ -182,16 +262,30 @@ def _widget(self): def adapt_cb(b): self.adapt(minimum=minimum.value, maximum=maximum.value) - adapt.on_click(adapt_cb) + def adapt_cores_cb(b): + self.adapt(minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value) - def scale_cb(b): - with log_errors(): - n = request.value - with ignoring(AttributeError): - self._adaptive.stop() - self.scale(n) + def adapt_mem_cb(b): + self.adapt(minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value) - scale.on_click(scale_cb) + adapt.on_click(adapt_cb) + adapt_cores.on_click(adapt_cores_cb) + adapt_mem.on_click(adapt_mem_cb) + + def scale_cb(request, kwarg): + def request_cb(b): + with log_errors(): + arg = request.value + with ignoring(AttributeError): + self._adaptive.stop() + local_kwargs = dict() + local_kwargs[kwarg] = arg + self.scale(**local_kwargs) + return request_cb + + scale.on_click(scale_cb(request, 'n')) + scale_cores.on_click(scale_cb(request_cores, 'cores')) + scale_memory.on_click(scale_cb(request_memory, 'memory')) scheduler_ref = ref(self.scheduler) @@ -206,3 +300,18 @@ def update(): def _ipython_display_(self, **kwargs): return self._widget()._ipython_display_(**kwargs) + + def _get_nb_workers_from_cores(self, cores): + self._raise_exception_if_not_worker_info() + return math.ceil(cores / self.worker_info['cores']) + + def _get_nb_workers_from_memory(self, memory): + self._raise_exception_if_not_worker_info() + return math.ceil(parse_bytes(memory) / parse_bytes(self.worker_info['memory'])) + + def _raise_exception_if_not_worker_info(self): + if not hasattr(self, 'worker_info'): + raise NotImplementedError('{} class does not provide worker_info ' + 'attribute, needed for scaling with ' + 'cores or memory kwargs.' + .format(self.__class__.__name__)) diff --git a/distributed/deploy/tests/test_cluster.py b/distributed/deploy/tests/test_cluster.py new file mode 100644 index 00000000000..d5b2d35ef54 --- /dev/null +++ b/distributed/deploy/tests/test_cluster.py @@ -0,0 +1,182 @@ +from __future__ import print_function, division, absolute_import + +from time import sleep + +import pytest +from toolz import frequencies, pluck +from tornado import gen +from tornado.ioloop import IOLoop + +from distributed import Client, LocalCluster +from distributed.utils_test import gen_test, slowinc +from distributed.utils_test import loop, nodebug # noqa: F401 +from distributed.metrics import time + + +def test_adaptive_local_cluster_adapt(loop): + with LocalCluster(0, scheduler_port=0, silence_logs=False, + diagnostics_port=None, loop=loop) as cluster: + cluster.adapt(interval=100) + with Client(cluster, loop=loop) as c: + assert not c.ncores() + future = c.submit(lambda x: x + 1, 1) + assert future.result() == 2 + assert c.ncores() + + sleep(0.1) + assert c.ncores() # still there after some time + + del future + + start = time() + while cluster.scheduler.ncores: + sleep(0.01) + assert time() < start + 5 + + assert not c.ncores() + + +@gen_test(timeout=30) +def test_min_max(): + loop = IOLoop.current() + cluster = yield LocalCluster(0, scheduler_port=0, silence_logs=False, + processes=False, diagnostics_port=None, + loop=loop, asynchronous=True, + threads_per_worker=2) + yield cluster._start() + try: + adapt = cluster.adapt(minimum=1, maximum=2, interval='20 ms', wait_count=10) + c = yield Client(cluster, asynchronous=True, loop=loop) + + start = time() + while not cluster.scheduler.workers: + yield gen.sleep(0.01) + assert time() < start + 1 + + yield gen.sleep(0.2) + assert len(cluster.scheduler.workers) == 1 + assert frequencies(pluck(1, adapt.log)) == {'up': 1} + + futures = c.map(slowinc, range(100), delay=0.1) + + start = time() + while len(cluster.scheduler.workers) < 2: + yield gen.sleep(0.01) + assert time() < start + 1 + + assert len(cluster.scheduler.workers) == 2 + yield gen.sleep(0.5) + assert len(cluster.scheduler.workers) == 2 + assert len(cluster.workers) == 2 + assert frequencies(pluck(1, adapt.log)) == {'up': 2} + + del futures + + start = time() + while len(cluster.scheduler.workers) != 1: + yield gen.sleep(0.01) + assert time() < start + 2 + assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1} + finally: + yield c._close() + yield cluster._close() + + +@gen_test(timeout=30) +def test_min_max_cores(): + loop = IOLoop.current() + cluster = yield LocalCluster(0, scheduler_port=0, silence_logs=False, + processes=False, diagnostics_port=None, + loop=loop, asynchronous=True, + threads_per_worker=2) + cluster.worker_info = {'cores': 2, 'memory': '256 MB'} + yield cluster._start() + try: + adapt = cluster.adapt(minimum_cores=2, maximum_cores=4, interval='20 ms', wait_count=10) + c = yield Client(cluster, asynchronous=True, loop=loop) + + start = time() + while not cluster.scheduler.workers: + yield gen.sleep(0.01) + assert time() < start + 1 + + yield gen.sleep(0.2) + assert len(cluster.scheduler.workers) == 1 + assert frequencies(pluck(1, adapt.log)) == {'up': 1} + + futures = c.map(slowinc, range(100), delay=0.1) + + start = time() + while len(cluster.scheduler.workers) < 2: + yield gen.sleep(0.01) + assert time() < start + 1 + + assert len(cluster.scheduler.workers) == 2 + yield gen.sleep(0.5) + assert len(cluster.scheduler.workers) == 2 + assert len(cluster.workers) == 2 + assert frequencies(pluck(1, adapt.log)) == {'up': 2} + + del futures + + start = time() + while len(cluster.scheduler.workers) != 1: + yield gen.sleep(0.01) + assert time() < start + 2 + assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1} + finally: + yield c._close() + yield cluster._close() + + +@gen_test(timeout=30) +def test_scale_cores_and_memory(): + loop = IOLoop.current() + cluster = yield LocalCluster(0, scheduler_port=0, silence_logs=False, + processes=False, diagnostics_port=None, + loop=loop, asynchronous=True) + cluster.worker_info = {'cores': 2, 'memory': '256 MB'} + yield cluster._start() + try: + cluster.scale(cores=4) + c = yield Client(cluster, asynchronous=True, loop=loop) + + start = time() + while not cluster.scheduler.workers and len(cluster.scheduler.workers) < 2: + yield gen.sleep(0.01) + assert time() < start + 1 + + assert len(cluster.scheduler.workers) == 2 + yield gen.sleep(0.5) + assert len(cluster.scheduler.workers) == 2 + assert len(cluster.workers) == 2 + + cluster.scale(memory='0 B') + start = time() + while len(cluster.scheduler.workers) != 0: + yield gen.sleep(0.01) + assert time() < start + 2 + + finally: + yield c._close() + yield cluster._close() + + +@gen_test(timeout=30) +def test_scale_cores_error(): + loop = IOLoop.current() + cluster = yield LocalCluster(0, scheduler_port=0, silence_logs=False, + processes=False, diagnostics_port=None, + loop=loop, asynchronous=True) + yield cluster._start() + with pytest.raises(NotImplementedError): + cluster.scale(cores=10) + + with pytest.raises(NotImplementedError): + cluster.scale(memory='10TB') + + with pytest.raises(NotImplementedError): + cluster.adapt(minimum_cores=4) + + with pytest.raises(ValueError): + cluster.scale(10, cores=20)