From 3d5b6a53ab679fc369e0bb82c8a59dd168ed8610 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 27 Mar 2018 08:04:29 -0400 Subject: [PATCH] Inherit from distributed.deploy.Cluster --- dask_jobqueue/core.py | 19 ++----------------- requirements.txt | 2 ++ setup.py | 2 +- 3 files changed, 5 insertions(+), 18 deletions(-) create mode 100644 requirements.txt diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 771c2a2a..c1d8f013 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -8,6 +8,7 @@ from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes from distributed import LocalCluster +from distributed.deploy import Cluster dirname = os.path.dirname(sys.executable) @@ -16,7 +17,7 @@ @docstrings.get_sectionsf('JobQueueCluster') -class JobQueueCluster(object): +class JobQueueCluster(Cluster): """ Base class to launch Dask Clusters for Job queues This class should not be used directly, use inherited class appropriate @@ -153,10 +154,6 @@ def start_workers(self, n=1): def scheduler(self): return self.cluster.scheduler - @property - def scheduler_address(self): - return self.cluster.scheduler_address - def _calls(self, cmds): """ Call a command using subprocess.communicate @@ -223,15 +220,3 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.stop_workers(self.jobs) self.cluster.__exit__(type, value, traceback) - - def adapt(self): - """ Start up an Adaptive deployment if not already started - - This makes the cluster request resources in accordance to current - demand on the scheduler """ - from distributed.deploy import Adaptive - if self._adaptive: - return - else: - self._adaptive = Adaptive(self.scheduler, self, startup_cost=5, - key=lambda ws: ws.host) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..50ebbec0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +distributed>=1.21.3 +docrep diff --git a/setup.py b/setup.py index c5234520..8cbfe7d8 100755 --- a/setup.py +++ b/setup.py @@ -9,6 +9,6 @@ url='https://github.com/dask/dask-jobqueue', license='BSD 3-Clause', packages=['dask_jobqueue'], + install_requires=open('requirements.txt').read().strip().split('\n'), long_description=(open('README.rst').read() if exists('README.rst') else ''), - install_requires=['docrep'], zip_safe=False)