From 524c34a8ac6aa7e0d73aa7942da8d74743cdc4c5 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Sun, 11 Mar 2018 17:10:51 +0100 Subject: [PATCH 01/10] WIP on class abstraction --- dask_jobqueue/core.py | 32 +++++++++++++++++++++++++++++-- dask_jobqueue/pbs.py | 44 +++++++++++++++++++------------------------ 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f0ea6dc7..15229fc0 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -2,8 +2,10 @@ import logging import subprocess import toolz +import socket -from distributed.utils import tmpfile, ignoring +from distributed.utils import tmpfile, ignoring, get_ip_interface +from distributed import LocalCluster logger = logging.getLogger(__name__) @@ -19,9 +21,35 @@ class JobQueueCluster(object): PBSCluster SLURMCluster """ - def __init__(self): + def __new__(cls, *args, **kwargs): + #Prevent class instantiation + print(str(cls)) raise NotImplemented + def __init__(self, + threads_per_worker=4, + processes=6, + memory='20GB', + interface=None, + death_timeout=60, + worker_extra='', + **kwargs + ): + + + if interface: + host = get_ip_interface(interface) + worker_extra += ' --interface %s ' % interface + else: + host = socket.gethostname() + + self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + self.memory = memory.replace(' ', '') + + self.jobs = dict() + self.n = 0 + self._adaptive = None + def job_script(self): self.n += 1 return self._template % toolz.merge(self.config, {'n': self.n}) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index cf634cbd..ff0053a1 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -1,6 +1,5 @@ import logging import os -import socket import sys from distributed import LocalCluster @@ -18,7 +17,7 @@ class PBSCluster(JobQueueCluster): Parameters ---------- - name : str + job_name : str Name of worker jobs. Passed to `$PBS -N` option. queue : str Destination queue for each worker job. Passed to `#PBS -q` option. @@ -41,7 +40,9 @@ class PBSCluster(JobQueueCluster): Network interface like 'eth0' or 'ib0'. death_timeout : float Seconds to wait for a scheduler before closing workers - extra : str + job_extra : list + Additional lines to put in PBS script header (usually starting with #PBS comment) + worker_extra : str Additional arguments to pass to `dask-worker` kwargs : dict Additional keyword arguments to pass to `LocalCluster` @@ -61,17 +62,18 @@ class PBSCluster(JobQueueCluster): >>> cluster.adapt() """ def __init__(self, - name='dask', - queue='regular', + name='dask-worker', + queue=None, project=None, - resource_spec='select=1:ncpus=36:mem=109GB', - threads_per_worker=4, - processes=9, - memory='7GB', + resource_spec='select=1:ncpus=24:mem=120GB', walltime='00:30:00', + pbs_extra=[], + threads_per_worker=4, + processes=6, + memory='20GB', interface=None, death_timeout=60, - extra='', + worker_extra='', **kwargs): self._template = """ #!/bin/bash @@ -92,18 +94,12 @@ def __init__(self, %(extra)s """.lstrip() - if interface: - host = get_ip_interface(interface) - extra += ' --interface %s ' % interface - else: - host = socket.gethostname() - project = project or os.environ.get('PBS_ACCOUNT') - if not project: - raise ValueError("Must specify a project like `project='UCLB1234' " - "or set PBS_ACCOUNT environment variable") - self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - memory = memory.replace(' ', '') + + super(PBSCluster, self).__init__(threads_per_worker=threads_per_worker,processes=processes, + memory=memory,interface=interface,death_timeout=death_timeout,worker_extra=worker_extra, + **kwargs) + self.config = {'name': name, 'queue': queue, 'project': project, @@ -115,10 +111,8 @@ def __init__(self, 'base_path': dirname, 'memory': memory, 'death_timeout': death_timeout, - 'extra': extra} - self.jobs = dict() - self.n = 0 - self._adaptive = None + 'extra': worker_extra} + self._submitcmd = 'qsub' self._cancelcmd = 'qdel' From 7017d91a5cf041edbc9fbb7530dd448cbbdf08e0 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Sun, 11 Mar 2018 23:55:18 +0100 Subject: [PATCH 02/10] WIP: Refactoring of attributes, script building, and class inheritance --- dask_jobqueue/core.py | 105 ++++++++++++++++++++++++++++++++++----- dask_jobqueue/pbs.py | 110 ++++++++++++++++++----------------------- dask_jobqueue/slurm.py | 78 +++++++++++------------------ 3 files changed, 169 insertions(+), 124 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 15229fc0..bce0683e 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,12 +1,15 @@ from contextlib import contextmanager import logging import subprocess -import toolz import socket +import os +import sys from distributed.utils import tmpfile, ignoring, get_ip_interface from distributed import LocalCluster +dirname = os.path.dirname(sys.executable) + logger = logging.getLogger(__name__) @@ -16,43 +19,119 @@ class JobQueueCluster(object): This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster or SLURMCluster) + Abstract init parameters + ------------------------ + name : str + Name of Dask workers. + threads : int + Number of threads per process. + processes : int + Number of processes per node. + memory : str + Bytes of memory that the worker can use. This should be a string + like "7GB" that can be interpretted both by PBS and Dask. + interface : str + Network interface like 'eth0' or 'ib0'. + death_timeout : float + Seconds to wait for a scheduler before closing workers + local_directory : str + Dask worker local directory for file spilling. + extra : str + Additional arguments to pass to `dask-worker` + kwargs : dict + Additional keyword arguments to pass to `LocalCluster` + See Also -------- PBSCluster SLURMCluster """ - def __new__(cls, *args, **kwargs): - #Prevent class instantiation - print(str(cls)) - raise NotImplemented + + _script_template =""" +#!/bin/bash + +%(job_header)s + +%(worker_command)s +""".lstrip() def __init__(self, - threads_per_worker=4, + name='dask-worker', + threads=4, processes=6, - memory='20GB', + memory='16GB', interface=None, death_timeout=60, - worker_extra='', + local_directory=None, + extra='', **kwargs ): + """ + This initializer should be considered as Abstract, and never used directly. + :param name: + :param threads: + :param processes: + :param memory: + :param interface: + :param death_timeout: + :param local_directory: + :param extra: + :param kwargs: + """ if interface: host = get_ip_interface(interface) - worker_extra += ' --interface %s ' % interface + extra += ' --interface %s ' % interface else: host = socket.gethostname() self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - self.memory = memory.replace(' ', '') self.jobs = dict() self.n = 0 self._adaptive = None + self._command_template = "%s/dask-worker %s" % (dirname, self.scheduler.address) + if threads != None: self._command_template += " --nthreads %d" % threads + if processes != None: self._command_template += " --nprocs %d" % processes + if memory != None: self._command_template += " --memory-limit %s" % memory + if name != None: + self._command_template += " --name %s" % name + self._command_template += "-%(n)d" #Keep %(n) to be replaced later. + if death_timeout != None: self._command_template += " --death-timeout %s" % death_timeout + if local_directory != None: self._command_template += " --local-directory %s" % local_directory + if extra!= None: self._command_template += extra + def job_script(self): self.n += 1 - return self._template % toolz.merge(self.config, {'n': self.n}) + return self._script_template % {'job_header': self.job_header, + 'worker_command': self._command_template % {'n': self.n} + } + + @property + def job_header(self): + """ + Abstract attribute for the Job scheduler script header part. + :return: A string representation of the Job script header part. + """ + raise NotImplementedError + + @property + def submitcmd(self): + """ + Abstract attribute for job scheduler submission command + :return: + """ + raise NotImplementedError + + @property + def cancelcmd(self): + """ + Abstract attribute for job scheduler cancel command + :return: + """ + raise NotImplementedError @contextmanager def job_file(self): @@ -67,7 +146,7 @@ def start_workers(self, n=1): workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call([self._submitcmd, fn]) + out = self._call([self.submitcmd, fn]) job = out.decode().split('.')[0] self.jobs[self.n] = job workers.append(self.n) @@ -127,7 +206,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] - self._call([self._cancelcmd] + list(jobs)) + self._call([self.cancelcmd] + list(jobs)) for w in workers: with ignoring(KeyError): del self.jobs[w] diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index ff0053a1..45b5646d 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -1,48 +1,49 @@ import logging import os -import sys - -from distributed import LocalCluster -from distributed.utils import get_ip_interface from .core import JobQueueCluster logger = logging.getLogger(__name__) -dirname = os.path.dirname(sys.executable) - class PBSCluster(JobQueueCluster): """ Launch Dask on a PBS cluster Parameters ---------- - job_name : str - Name of worker jobs. Passed to `$PBS -N` option. + name : str + Name of worker jobs and Dask workers. Passed to `$PBS -N` option. queue : str Destination queue for each worker job. Passed to `#PBS -q` option. project : str Accounting string associated with each worker job. Passed to `#PBS -A` option. - threads_per_worker : int + resource_spec : str + Request resources and specify job placement. Passed to `#PBS -l` + option. + walltime : str + Walltime for each worker job. + job_extra : list + List of other PBS options, for example -j oe. Passed with '#PBS ' prefix + local_directory : str + Dask worker local directory for file spilling. + kwargs : dict + Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` + + Inherited parameters + -------------------- + threads : int Number of threads per process. processes : int Number of processes per node. memory : str Bytes of memory that the worker can use. This should be a string like "7GB" that can be interpretted both by PBS and Dask. - resource_spec : str - Request resources and specify job placement. Passed to `#PBS -l` - option. - walltime : str - Walltime for each worker job. interface : str Network interface like 'eth0' or 'ib0'. death_timeout : float Seconds to wait for a scheduler before closing workers - job_extra : list - Additional lines to put in PBS script header (usually starting with #PBS comment) - worker_extra : str + extra : str Additional arguments to pass to `dask-worker` kwargs : dict Additional keyword arguments to pass to `LocalCluster` @@ -61,59 +62,44 @@ class PBSCluster(JobQueueCluster): >>> cluster.adapt() """ + + _submitcmd = 'qsub' + _cancelcmd = 'qdel' + + def __init__(self, name='dask-worker', queue=None, project=None, - resource_spec='select=1:ncpus=24:mem=120GB', + resource_spec='select=1:ncpus=24:mem=100GB', walltime='00:30:00', - pbs_extra=[], - threads_per_worker=4, - processes=6, - memory='20GB', - interface=None, - death_timeout=60, - worker_extra='', + job_extra=[], + local_directory='$TMPDIR', **kwargs): - self._template = """ -#!/bin/bash - -#PBS -N %(name)s -#PBS -q %(queue)s -#PBS -A %(project)s -#PBS -l %(resource_spec)s -#PBS -l walltime=%(walltime)s -#PBS -j oe - -%(base_path)s/dask-worker %(scheduler)s \ - --nthreads %(threads_per_worker)d \ - --nprocs %(processes)s \ - --memory-limit %(memory)s \ - --name %(name)s-%(n)d \ - --death-timeout %(death_timeout)s \ - %(extra)s -""".lstrip() + + super(PBSCluster, self).__init__(name=name, local_directory=local_directory, **kwargs) project = project or os.environ.get('PBS_ACCOUNT') - super(PBSCluster, self).__init__(threads_per_worker=threads_per_worker,processes=processes, - memory=memory,interface=interface,death_timeout=death_timeout,worker_extra=worker_extra, - **kwargs) - - self.config = {'name': name, - 'queue': queue, - 'project': project, - 'threads_per_worker': threads_per_worker, - 'processes': processes, - 'walltime': walltime, - 'scheduler': self.scheduler.address, - 'resource_spec': resource_spec, - 'base_path': dirname, - 'memory': memory, - 'death_timeout': death_timeout, - 'extra': worker_extra} - - self._submitcmd = 'qsub' - self._cancelcmd = 'qdel' + header_lines = ['#PBS -N %s' % name] + if queue != None: header_lines.append('#PBS -q %s' % queue) + if project != None: header_lines.append('#PBS -A %s' % project) + if resource_spec != None: header_lines.append('#PBS -l %s' % resource_spec) + if walltime != None: header_lines.append('#PBS -l walltime=%s' % walltime) + header_lines.extend(['#PBS %s' % arg for arg in job_extra]) + + self._job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + + @property + def job_header(self): + return self._job_header + + @property + def submitcmd(self): + return self._submitcmd + + @property + def cancelcmd(self): + return self._cancelcmd diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 8828239d..681b8e47 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,11 +1,7 @@ import logging import os -import socket import sys -from distributed import LocalCluster -from distributed.utils import get_ip_interface - from .core import JobQueueCluster logger = logging.getLogger(__name__) @@ -30,17 +26,17 @@ class SLURMCluster(JobQueueCluster): >>> cluster.adapt() """ + + _submitcmd = 'sbatch' + _cancelcmd = 'scancel' + def __init__(self, name='dask', queue='', project=None, - threads_per_worker=4, processes=8, memory='7GB', walltime='00:30:00', - interface=None, - death_timeout=60, - extra='', **kwargs): """ Initialize a SLURM Cluster @@ -54,8 +50,6 @@ def __init__(self, project : str Accounting string associated with each worker job. Passed to `#SBATCH -A` option. - threads_per_worker : int - Number of threads per process. processes : int Number of processes per node. memory : str @@ -63,18 +57,17 @@ def __init__(self, like "7GB" that can be interpretted both by PBS and Dask. walltime : str Walltime for each worker job. - interface : str - Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - extra : str - Additional arguments to pass to `dask-worker` kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` """ - self._template = """ -#!/bin/bash + super(SLURMCluster, self).__init__(name=name, processes=processes, **kwargs) + + #TODO has this been tested? This seems weird to use only processes, and not processes * threads + # There are no memory limit given to Slurm either? + + #Keeping template for now has I don't know much about slurm. + self._header_template = """ #SBATCH -J %(name)s #SBATCH -n %(processes)d #SBATCH -p %(queue)s @@ -86,43 +79,30 @@ def __init__(self, export LANG="en_US.utf8" export LANGUAGE="en_US.utf8" export LC_ALL="en_US.utf8" - -%(base_path)s/dask-worker %(scheduler)s \ - --nthreads %(threads_per_worker)d \ - --nprocs %(processes)s \ - --memory-limit %(memory)s \ - --name %(name)s-%(n)d \ - --death-timeout %(death_timeout)s \ - %(extra)s """.lstrip() - if interface: - host = get_ip_interface(interface) - extra += ' --interface %s ' % interface - else: - host = socket.gethostname() - - project = project or os.environ.get('SLURM_ACCOUNT') - if not project: - raise ValueError("Must specify a project like `project='UCLB1234' " - "or set SLURM_ACCOUNT environment variable") - self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) memory = memory.replace(' ', '') self.config = {'name': name, 'queue': queue, 'project': project, - 'threads_per_worker': threads_per_worker, 'processes': processes, - 'scheduler': self.scheduler.address, 'walltime': walltime, - 'base_path': dirname, - 'memory': memory, - 'death_timeout': death_timeout, - 'extra': extra} - self.jobs = dict() - self.n = 0 - self._adaptive = None - self._submitcmd = 'sbatch' - self._cancelcmd = 'scancel' + # Not used + 'memory': memory + } + + self._job_header = self._header_template % self.config logger.debug("Job script: \n %s" % self.job_script()) + + @property + def job_header(self): + return self._job_header + + @property + def submitcmd(self): + return self._submitcmd + + @property + def cancelcmd(self): + return self._cancelcmd From 14dc4077689f8a9e51144faf4bbd672c2763cd52 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Mon, 12 Mar 2018 23:48:34 +0100 Subject: [PATCH 03/10] Comments from #10, command as class variables, flake8 check... --- dask_jobqueue/core.py | 75 +++++++++++++++---------------- dask_jobqueue/pbs.py | 31 +++++++------ dask_jobqueue/slurm.py | 13 ++---- dask_jobqueue/tests/test_pbs.py | 4 +- dask_jobqueue/tests/test_slurm.py | 9 ++-- 5 files changed, 60 insertions(+), 72 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index bce0683e..fba5f26a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -19,8 +19,8 @@ class JobQueueCluster(object): This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster or SLURMCluster) - Abstract init parameters - ------------------------ + Parameters + ---------- name : str Name of Dask workers. threads : int @@ -41,20 +41,31 @@ class JobQueueCluster(object): kwargs : dict Additional keyword arguments to pass to `LocalCluster` + Attributes + ---------- + submit_command: str + Abstract attribute for job scheduler submit command, should be overriden + cancel_command: str + Abstract attribute for job scheduler cancel command, should be overriden + See Also -------- PBSCluster SLURMCluster """ - _script_template =""" + _script_template = """ #!/bin/bash %(job_header)s -%(worker_command)s +%(worker_command)s """.lstrip() + #Following class attributes should be overriden by extending classes. + submit_command = None + cancel_command = None + def __init__(self, name='dask-worker', threads=4, @@ -68,17 +79,9 @@ def __init__(self, ): """ This initializer should be considered as Abstract, and never used directly. - :param name: - :param threads: - :param processes: - :param memory: - :param interface: - :param death_timeout: - :param local_directory: - :param extra: - :param kwargs: """ - + if not self.cancel_command or not self.submit_command: + raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.') if interface: host = get_ip_interface(interface) @@ -92,16 +95,23 @@ def __init__(self, self.n = 0 self._adaptive = None + #dask-worker command line build self._command_template = "%s/dask-worker %s" % (dirname, self.scheduler.address) - if threads != None: self._command_template += " --nthreads %d" % threads - if processes != None: self._command_template += " --nprocs %d" % processes - if memory != None: self._command_template += " --memory-limit %s" % memory - if name != None: + if threads is not None: + self._command_template += " --nthreads %d" % threads + if processes is not None: + self._command_template += " --nprocs %d" % processes + if memory is not None: + self._command_template += " --memory-limit %s" % memory + if name is not None: self._command_template += " --name %s" % name self._command_template += "-%(n)d" #Keep %(n) to be replaced later. - if death_timeout != None: self._command_template += " --death-timeout %s" % death_timeout - if local_directory != None: self._command_template += " --local-directory %s" % local_directory - if extra!= None: self._command_template += extra + if death_timeout is not None: + self._command_template += " --death-timeout %s" % death_timeout + if local_directory is not None: + self._command_template += " --local-directory %s" % local_directory + if extra is not None: + self._command_template += extra def job_script(self): self.n += 1 @@ -113,23 +123,10 @@ def job_script(self): def job_header(self): """ Abstract attribute for the Job scheduler script header part. - :return: A string representation of the Job script header part. - """ - raise NotImplementedError - - @property - def submitcmd(self): - """ - Abstract attribute for job scheduler submission command - :return: - """ - raise NotImplementedError - @property - def cancelcmd(self): - """ - Abstract attribute for job scheduler cancel command - :return: + Returns + ------- + A string containing multiple lines to be used as header of job file to be sumitted. """ raise NotImplementedError @@ -146,7 +143,7 @@ def start_workers(self, n=1): workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call([self.submitcmd, fn]) + out = self._call([self.submit_command, fn]) job = out.decode().split('.')[0] self.jobs[self.n] = job workers.append(self.n) @@ -206,7 +203,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] - self._call([self.cancelcmd] + list(jobs)) + self._call([self.cancel_command] + list(jobs)) for w in workers: with ignoring(KeyError): del self.jobs[w] diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 45b5646d..9efce923 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -51,7 +51,7 @@ class PBSCluster(JobQueueCluster): Examples -------- >>> from dask_jobqueue import PBSCluster - >>> cluster = PBSCluster(project='...') + >>> cluster = PBSCluster(queue= 'regular', project='DaskOnPBS') >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -63,9 +63,9 @@ class PBSCluster(JobQueueCluster): >>> cluster.adapt() """ - _submitcmd = 'qsub' - _cancelcmd = 'qdel' - + #Override class variables + submit_command = 'qsub' + cancel_command = 'qdel' def __init__(self, name='dask-worker', @@ -77,15 +77,22 @@ def __init__(self, local_directory='$TMPDIR', **kwargs): + #Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(name=name, local_directory=local_directory, **kwargs) + #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') + #PBS header build header_lines = ['#PBS -N %s' % name] - if queue != None: header_lines.append('#PBS -q %s' % queue) - if project != None: header_lines.append('#PBS -A %s' % project) - if resource_spec != None: header_lines.append('#PBS -l %s' % resource_spec) - if walltime != None: header_lines.append('#PBS -l walltime=%s' % walltime) + if queue is not None: + header_lines.append('#PBS -q %s' % queue) + if project is not None: + header_lines.append('#PBS -A %s' % project) + if resource_spec is not None: + header_lines.append('#PBS -l %s' % resource_spec) + if walltime is not None: + header_lines.append('#PBS -l walltime=%s' % walltime) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) self._job_header = '\n'.join(header_lines) @@ -95,11 +102,3 @@ def __init__(self, @property def job_header(self): return self._job_header - - @property - def submitcmd(self): - return self._submitcmd - - @property - def cancelcmd(self): - return self._cancelcmd diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 681b8e47..84adf224 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -27,8 +27,9 @@ class SLURMCluster(JobQueueCluster): >>> cluster.adapt() """ - _submitcmd = 'sbatch' - _cancelcmd = 'scancel' + #Override class variables + submit_command = 'sbatch' + cancel_command = 'scancel' def __init__(self, name='dask', @@ -98,11 +99,3 @@ def __init__(self, @property def job_header(self): return self._job_header - - @property - def submitcmd(self): - return self._submitcmd - - @property - def cancelcmd(self): - return self._cancelcmd diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index dfe65df4..cde12568 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -5,11 +5,11 @@ from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -from pangeo import PBSCluster +from dask_jobqueue import PBSCluster def test_basic(loop): - with PBSCluster(walltime='00:02:00', threads_per_worker=2, memory='7GB', + with PBSCluster(walltime='00:02:00', threads=2, memory='7GB', interface='ib0', loop=loop) as cluster: with Client(cluster) as client: workers = cluster.start_workers(2) diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 6d542782..629da7b9 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -5,12 +5,12 @@ from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -from pangeo import SLURMCluster +from dask_jobqueue import SLURMCluster def test_basic(loop): - with SLURMCluster(walltime='00:02:00', threads_per_worker=2, memory='7GB', - loop=loop) as cluster: + with SLURMCluster(walltime='00:02:00', threads=2, memory='7GB', + loop=loop) as cluster: with Client(cluster) as client: workers = cluster.start_workers(2) future = client.submit(lambda x: x + 1, 10) @@ -42,8 +42,7 @@ def test_adaptive(loop): assert cluster.jobs start = time() - while len(client.scheduler_info()['workers']) \ - != cluster.config['processes']: + while len(client.scheduler_info()['workers']) != cluster.config['processes']: sleep(0.1) assert time() < start + 10 From 899aa5dfe2c399434a92b0e209eccc5bff49ce92 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Sat, 17 Mar 2018 22:33:50 +0100 Subject: [PATCH 04/10] Taking into account @jhamman comments --- dask_jobqueue/core.py | 16 ++++------------ dask_jobqueue/pbs.py | 14 +++++--------- dask_jobqueue/slurm.py | 5 +---- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index fba5f26a..c30b6b8c 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -83,6 +83,9 @@ def __init__(self, if not self.cancel_command or not self.submit_command: raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.') + #This attribute should be overriden + self.job_header = None + if interface: host = get_ip_interface(interface) extra += ' --interface %s ' % interface @@ -96,7 +99,7 @@ def __init__(self, self._adaptive = None #dask-worker command line build - self._command_template = "%s/dask-worker %s" % (dirname, self.scheduler.address) + self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address) if threads is not None: self._command_template += " --nthreads %d" % threads if processes is not None: @@ -119,17 +122,6 @@ def job_script(self): 'worker_command': self._command_template % {'n': self.n} } - @property - def job_header(self): - """ - Abstract attribute for the Job scheduler script header part. - - Returns - ------- - A string containing multiple lines to be used as header of job file to be sumitted. - """ - raise NotImplementedError - @contextmanager def job_file(self): """ Write job submission script to temporary file """ diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 9efce923..f2a6071a 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -24,7 +24,7 @@ class PBSCluster(JobQueueCluster): walltime : str Walltime for each worker job. job_extra : list - List of other PBS options, for example -j oe. Passed with '#PBS ' prefix + List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. local_directory : str Dask worker local directory for file spilling. kwargs : dict @@ -51,7 +51,7 @@ class PBSCluster(JobQueueCluster): Examples -------- >>> from dask_jobqueue import PBSCluster - >>> cluster = PBSCluster(queue= 'regular', project='DaskOnPBS') + >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS') >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -74,11 +74,10 @@ def __init__(self, resource_spec='select=1:ncpus=24:mem=100GB', walltime='00:30:00', job_extra=[], - local_directory='$TMPDIR', **kwargs): #Instantiate args and parameters from parent abstract class - super(PBSCluster, self).__init__(name=name, local_directory=local_directory, **kwargs) + super(PBSCluster, self).__init__(name=name, **kwargs) #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') @@ -95,10 +94,7 @@ def __init__(self, header_lines.append('#PBS -l walltime=%s' % walltime) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) - self._job_header = '\n'.join(header_lines) + #Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) - - @property - def job_header(self): - return self._job_header diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 84adf224..5b1a8ad0 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -92,10 +92,7 @@ def __init__(self, 'memory': memory } - self._job_header = self._header_template % self.config + self.job_header = self._header_template % self.config logger.debug("Job script: \n %s" % self.job_script()) - @property - def job_header(self): - return self._job_header From 1bfe7cd49d436b35a5efb241b72a1919f93040da Mon Sep 17 00:00:00 2001 From: Guillaume Date: Sat, 17 Mar 2018 22:44:47 +0100 Subject: [PATCH 05/10] Add some notes on resource_spec and local_directory --- dask_jobqueue/pbs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index f2a6071a..339aaa7a 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -9,6 +9,7 @@ class PBSCluster(JobQueueCluster): """ Launch Dask on a PBS cluster + Parameters ---------- name : str @@ -61,6 +62,11 @@ class PBSCluster(JobQueueCluster): kill workers based on load. >>> cluster.adapt() + + It is a good practice to define local_directory to your PBS system scratch directory, + and you should specify resource_spec according to the processes and threads asked: + >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \ + threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB') """ #Override class variables From 151bd98894a5878f114b57ecaf5f2b81d5b3d284 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Wed, 21 Mar 2018 21:38:22 +0100 Subject: [PATCH 06/10] default resource_spec to None. resource_spec computed from memory, threads and processes if not given --- dask_jobqueue/core.py | 7 ++++++- dask_jobqueue/pbs.py | 37 ++++++++++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c30b6b8c..1b1f21c2 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -5,7 +5,7 @@ import os import sys -from distributed.utils import tmpfile, ignoring, get_ip_interface +from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes, format_bytes from distributed import LocalCluster dirname = os.path.dirname(sys.executable) @@ -94,6 +94,11 @@ def __init__(self, self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + #Keep information on process, threads and memory, for use in subclasses + self.worker_memory = parse_bytes(memory) + self.worker_processes = processes + self.worker_threads = threads + self.jobs = dict() self.n = 0 self._adaptive = None diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 339aaa7a..31a50d3d 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -1,7 +1,9 @@ import logging import os +import math from .core import JobQueueCluster +from distributed.utils import format_bytes logger = logging.getLogger(__name__) @@ -77,7 +79,7 @@ def __init__(self, name='dask-worker', queue=None, project=None, - resource_spec='select=1:ncpus=24:mem=100GB', + resource_spec=None, walltime='00:30:00', job_extra=[], **kwargs): @@ -94,8 +96,13 @@ def __init__(self, header_lines.append('#PBS -q %s' % queue) if project is not None: header_lines.append('#PBS -A %s' % project) - if resource_spec is not None: - header_lines.append('#PBS -l %s' % resource_spec) + if resource_spec is None: + #Compute default resources specifications + ncpus = self.worker_processes * self.worker_threads + memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes) + resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string) + logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec) + header_lines.append('#PBS -l %s' % resource_spec) if walltime is not None: header_lines.append('#PBS -l walltime=%s' % walltime) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) @@ -104,3 +111,27 @@ def __init__(self, self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + +def pbs_format_bytes_ceil(n): + """ Format bytes as text + PBS expects KiB, MiB or Gib, but names it KB, MB, GB + Whereas Dask makes the difference between KB and KiB + + >>> pbs_format_bytes_ceil(1) + '1B' + >>> pbs_format_bytes_ceil(1234) + '1234B' + >>> pbs_format_bytes_ceil(12345678) + '13MB' + >>> pbs_format_bytes_ceil(1234567890) + '1177MB' + >>> pbs_format_bytes_ceil(15000000000) + '14GB' + """ + if n >= 10*(1024**3): + return '%dGB' % math.ceil(n / (1024**3)) + if n >= 10*(1024**2): + return '%dMB' % (n / (1024**2)) + if n >= 10*1024: + return '%dkB' % (n / 1024) + return '%dB' % n \ No newline at end of file From a0175383479d48bf1a5013ddcd391ba665b69f38 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Fri, 23 Mar 2018 21:41:18 +0100 Subject: [PATCH 07/10] Completed docstrings from PBS and SLURMCluster. Removed some TODO --- dask_jobqueue/pbs.py | 8 ++++++-- dask_jobqueue/slurm.py | 25 ++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 31a50d3d..71990fcc 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -33,8 +33,10 @@ class PBSCluster(JobQueueCluster): kwargs : dict Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` - Inherited parameters - -------------------- + Inherited parameters from JobQueueCluster + ----------------------------------------- + name : str + Name of Dask workers. threads : int Number of threads per process. processes : int @@ -46,6 +48,8 @@ class PBSCluster(JobQueueCluster): Network interface like 'eth0' or 'ib0'. death_timeout : float Seconds to wait for a scheduler before closing workers + local_directory : str + Dask worker local directory for file spilling. extra : str Additional arguments to pass to `dask-worker` kwargs : dict diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 5b1a8ad0..02385404 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -60,13 +60,32 @@ def __init__(self, Walltime for each worker job. kwargs : dict Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` + + Inherited parameters from JobQueueCluster + ----------------------------------------- + name : str + Name of Dask workers. + threads : int + Number of threads per process. + processes : int + Number of processes per node. + memory : str + Bytes of memory that the worker can use. This should be a string + like "7GB" that can be interpretted both by PBS and Dask. + interface : str + Network interface like 'eth0' or 'ib0'. + death_timeout : float + Seconds to wait for a scheduler before closing workers + local_directory : str + Dask worker local directory for file spilling. + extra : str + Additional arguments to pass to `dask-worker` + kwargs : dict + Additional keyword arguments to pass to `LocalCluster` """ super(SLURMCluster, self).__init__(name=name, processes=processes, **kwargs) - #TODO has this been tested? This seems weird to use only processes, and not processes * threads - # There are no memory limit given to Slurm either? - #Keeping template for now has I don't know much about slurm. self._header_template = """ #SBATCH -J %(name)s From 4ba6dd45e302b7248331607cbf2e531051c17d3a Mon Sep 17 00:00:00 2001 From: Guillaume Date: Fri, 23 Mar 2018 21:42:28 +0100 Subject: [PATCH 08/10] Removing comment --- dask_jobqueue/slurm.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 02385404..2063cc4b 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -60,7 +60,7 @@ def __init__(self, Walltime for each worker job. kwargs : dict Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` - + Inherited parameters from JobQueueCluster ----------------------------------------- name : str @@ -86,7 +86,6 @@ def __init__(self, super(SLURMCluster, self).__init__(name=name, processes=processes, **kwargs) - #Keeping template for now has I don't know much about slurm. self._header_template = """ #SBATCH -J %(name)s #SBATCH -n %(processes)d From ad2ddfa55ba1e616090b4687689725dcb1797172 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Fri, 23 Mar 2018 22:07:16 +0100 Subject: [PATCH 09/10] flake quality checks --- dask_jobqueue/core.py | 2 +- dask_jobqueue/pbs.py | 10 +++++----- dask_jobqueue/slurm.py | 1 - 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1b1f21c2..6d41402f 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -5,7 +5,7 @@ import os import sys -from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes, format_bytes +from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes from distributed import LocalCluster dirname = os.path.dirname(sys.executable) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 71990fcc..0266d45a 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -3,7 +3,6 @@ import math from .core import JobQueueCluster -from distributed.utils import format_bytes logger = logging.getLogger(__name__) @@ -116,6 +115,7 @@ def __init__(self, logger.debug("Job script: \n %s" % self.job_script()) + def pbs_format_bytes_ceil(n): """ Format bytes as text PBS expects KiB, MiB or Gib, but names it KB, MB, GB @@ -132,10 +132,10 @@ def pbs_format_bytes_ceil(n): >>> pbs_format_bytes_ceil(15000000000) '14GB' """ - if n >= 10*(1024**3): + if n >= 10 * (1024**3): return '%dGB' % math.ceil(n / (1024**3)) - if n >= 10*(1024**2): + if n >= 10 * (1024**2): return '%dMB' % (n / (1024**2)) - if n >= 10*1024: + if n >= 10 * 1024: return '%dkB' % (n / 1024) - return '%dB' % n \ No newline at end of file + return '%dB' % n diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 2063cc4b..7e743b40 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -113,4 +113,3 @@ def __init__(self, self.job_header = self._header_template % self.config logger.debug("Job script: \n %s" % self.job_script()) - From b0cae6d6a572d9d295173e85e96cd2eb343a4c18 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Tue, 27 Mar 2018 00:04:46 +0200 Subject: [PATCH 10/10] Docstring inheritance using docrep --- dask_jobqueue/core.py | 3 +++ dask_jobqueue/pbs.py | 37 ++++++------------------------------- dask_jobqueue/slurm.py | 35 +++++------------------------------ setup.py | 1 + 4 files changed, 15 insertions(+), 61 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 6d41402f..771c2a2a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -4,6 +4,7 @@ import socket import os import sys +import docrep from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes from distributed import LocalCluster @@ -11,8 +12,10 @@ dirname = os.path.dirname(sys.executable) logger = logging.getLogger(__name__) +docstrings = docrep.DocstringProcessor() +@docstrings.get_sectionsf('JobQueueCluster') class JobQueueCluster(object): """ Base class to launch Dask Clusters for Job queues diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 0266d45a..6a2ac570 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -2,19 +2,17 @@ import os import math -from .core import JobQueueCluster +from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) +@docstrings.with_indent(4) class PBSCluster(JobQueueCluster): """ Launch Dask on a PBS cluster - Parameters ---------- - name : str - Name of worker jobs and Dask workers. Passed to `$PBS -N` option. queue : str Destination queue for each worker job. Passed to `#PBS -q` option. project : str @@ -29,30 +27,7 @@ class PBSCluster(JobQueueCluster): List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. local_directory : str Dask worker local directory for file spilling. - kwargs : dict - Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` - - Inherited parameters from JobQueueCluster - ----------------------------------------- - name : str - Name of Dask workers. - threads : int - Number of threads per process. - processes : int - Number of processes per node. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by PBS and Dask. - interface : str - Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - local_directory : str - Dask worker local directory for file spilling. - extra : str - Additional arguments to pass to `dask-worker` - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + %(JobQueueCluster.parameters)s Examples -------- @@ -79,7 +54,6 @@ class PBSCluster(JobQueueCluster): cancel_command = 'qdel' def __init__(self, - name='dask-worker', queue=None, project=None, resource_spec=None, @@ -88,13 +62,14 @@ def __init__(self, **kwargs): #Instantiate args and parameters from parent abstract class - super(PBSCluster, self).__init__(name=name, **kwargs) + super(PBSCluster, self).__init__(**kwargs) #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') #PBS header build - header_lines = ['#PBS -N %s' % name] + if self.name is not None: + header_lines = ['#PBS -N %s' % self.name] if queue is not None: header_lines.append('#PBS -q %s' % queue) if project is not None: diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 7e743b40..5c3c0c87 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -2,13 +2,14 @@ import os import sys -from .core import JobQueueCluster +from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) dirname = os.path.dirname(sys.executable) +@docstrings.with_indent(4) class SLURMCluster(JobQueueCluster): """ Launch Dask on a SLURM cluster @@ -32,7 +33,6 @@ class SLURMCluster(JobQueueCluster): cancel_command = 'scancel' def __init__(self, - name='dask', queue='', project=None, processes=8, @@ -43,8 +43,6 @@ def __init__(self, Parameters ---------- - name : str - Name of worker jobs. Passed to `#SBATCH -J` option. queue : str Destination queue for each worker job. Passed to `#SBATCH -p` option. @@ -58,33 +56,10 @@ def __init__(self, like "7GB" that can be interpretted both by PBS and Dask. walltime : str Walltime for each worker job. - kwargs : dict - Additional keyword arguments to pass to `JobQueueCluster` and `LocalCluster` - - Inherited parameters from JobQueueCluster - ----------------------------------------- - name : str - Name of Dask workers. - threads : int - Number of threads per process. - processes : int - Number of processes per node. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by PBS and Dask. - interface : str - Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - local_directory : str - Dask worker local directory for file spilling. - extra : str - Additional arguments to pass to `dask-worker` - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + %(JobQueueCluster.parameters)s """ - super(SLURMCluster, self).__init__(name=name, processes=processes, **kwargs) + super(SLURMCluster, self).__init__(processes=processes, **kwargs) self._header_template = """ #SBATCH -J %(name)s @@ -101,7 +76,7 @@ def __init__(self, """.lstrip() memory = memory.replace(' ', '') - self.config = {'name': name, + self.config = {'name': self.name, 'queue': queue, 'project': project, 'processes': processes, diff --git a/setup.py b/setup.py index b82a3938..c5234520 100755 --- a/setup.py +++ b/setup.py @@ -10,4 +10,5 @@ license='BSD 3-Clause', packages=['dask_jobqueue'], long_description=(open('README.rst').read() if exists('README.rst') else ''), + install_requires=['docrep'], zip_safe=False)