diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f0ea6dc7..771c2a2a 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,30 +1,134 @@ from contextlib import contextmanager import logging import subprocess -import toolz +import socket +import os +import sys +import docrep -from distributed.utils import tmpfile, ignoring +from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes +from distributed import LocalCluster + +dirname = os.path.dirname(sys.executable) logger = logging.getLogger(__name__) +docstrings = docrep.DocstringProcessor() +@docstrings.get_sectionsf('JobQueueCluster') class JobQueueCluster(object): """ Base class to launch Dask Clusters for Job queues This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster or SLURMCluster) + Parameters + ---------- + name : str + Name of Dask workers. + threads : int + Number of threads per process. + processes : int + Number of processes per node. + memory : str + Bytes of memory that the worker can use. This should be a string + like "7GB" that can be interpretted both by PBS and Dask. + interface : str + Network interface like 'eth0' or 'ib0'. + death_timeout : float + Seconds to wait for a scheduler before closing workers + local_directory : str + Dask worker local directory for file spilling. + extra : str + Additional arguments to pass to `dask-worker` + kwargs : dict + Additional keyword arguments to pass to `LocalCluster` + + Attributes + ---------- + submit_command: str + Abstract attribute for job scheduler submit command, should be overriden + cancel_command: str + Abstract attribute for job scheduler cancel command, should be overriden + See Also -------- PBSCluster SLURMCluster """ - def __init__(self): - raise NotImplemented + + _script_template = """ +#!/bin/bash + +%(job_header)s + +%(worker_command)s +""".lstrip() + + #Following class attributes should be overriden by extending classes. + submit_command = None + cancel_command = None + + def __init__(self, + name='dask-worker', + threads=4, + processes=6, + memory='16GB', + interface=None, + death_timeout=60, + local_directory=None, + extra='', + **kwargs + ): + """ + This initializer should be considered as Abstract, and never used directly. + """ + if not self.cancel_command or not self.submit_command: + raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.') + + #This attribute should be overriden + self.job_header = None + + if interface: + host = get_ip_interface(interface) + extra += ' --interface %s ' % interface + else: + host = socket.gethostname() + + self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) + + #Keep information on process, threads and memory, for use in subclasses + self.worker_memory = parse_bytes(memory) + self.worker_processes = processes + self.worker_threads = threads + + self.jobs = dict() + self.n = 0 + self._adaptive = None + + #dask-worker command line build + self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address) + if threads is not None: + self._command_template += " --nthreads %d" % threads + if processes is not None: + self._command_template += " --nprocs %d" % processes + if memory is not None: + self._command_template += " --memory-limit %s" % memory + if name is not None: + self._command_template += " --name %s" % name + self._command_template += "-%(n)d" #Keep %(n) to be replaced later. + if death_timeout is not None: + self._command_template += " --death-timeout %s" % death_timeout + if local_directory is not None: + self._command_template += " --local-directory %s" % local_directory + if extra is not None: + self._command_template += extra def job_script(self): self.n += 1 - return self._template % toolz.merge(self.config, {'n': self.n}) + return self._script_template % {'job_header': self.job_header, + 'worker_command': self._command_template % {'n': self.n} + } @contextmanager def job_file(self): @@ -39,7 +143,7 @@ def start_workers(self, n=1): workers = [] for _ in range(n): with self.job_file() as fn: - out = self._call([self._submitcmd, fn]) + out = self._call([self.submit_command, fn]) job = out.decode().split('.')[0] self.jobs[self.n] = job workers.append(self.n) @@ -99,7 +203,7 @@ def stop_workers(self, workers): return workers = list(map(int, workers)) jobs = [self.jobs[w] for w in workers] - self._call([self._cancelcmd] + list(jobs)) + self._call([self.cancel_command] + list(jobs)) for w in workers: with ignoring(KeyError): del self.jobs[w] diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index cf634cbd..6a2ac570 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -1,55 +1,38 @@ import logging import os -import socket -import sys +import math -from distributed import LocalCluster -from distributed.utils import get_ip_interface - -from .core import JobQueueCluster +from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) -dirname = os.path.dirname(sys.executable) - +@docstrings.with_indent(4) class PBSCluster(JobQueueCluster): """ Launch Dask on a PBS cluster Parameters ---------- - name : str - Name of worker jobs. Passed to `$PBS -N` option. queue : str Destination queue for each worker job. Passed to `#PBS -q` option. project : str Accounting string associated with each worker job. Passed to `#PBS -A` option. - threads_per_worker : int - Number of threads per process. - processes : int - Number of processes per node. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by PBS and Dask. resource_spec : str Request resources and specify job placement. Passed to `#PBS -l` option. walltime : str Walltime for each worker job. - interface : str - Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - extra : str - Additional arguments to pass to `dask-worker` - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + job_extra : list + List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. + local_directory : str + Dask worker local directory for file spilling. + %(JobQueueCluster.parameters)s Examples -------- >>> from dask_jobqueue import PBSCluster - >>> cluster = PBSCluster(project='...') + >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS') >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -59,67 +42,75 @@ class PBSCluster(JobQueueCluster): kill workers based on load. >>> cluster.adapt() + + It is a good practice to define local_directory to your PBS system scratch directory, + and you should specify resource_spec according to the processes and threads asked: + >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \ + threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB') """ + + #Override class variables + submit_command = 'qsub' + cancel_command = 'qdel' + def __init__(self, - name='dask', - queue='regular', + queue=None, project=None, - resource_spec='select=1:ncpus=36:mem=109GB', - threads_per_worker=4, - processes=9, - memory='7GB', + resource_spec=None, walltime='00:30:00', - interface=None, - death_timeout=60, - extra='', + job_extra=[], **kwargs): - self._template = """ -#!/bin/bash - -#PBS -N %(name)s -#PBS -q %(queue)s -#PBS -A %(project)s -#PBS -l %(resource_spec)s -#PBS -l walltime=%(walltime)s -#PBS -j oe - -%(base_path)s/dask-worker %(scheduler)s \ - --nthreads %(threads_per_worker)d \ - --nprocs %(processes)s \ - --memory-limit %(memory)s \ - --name %(name)s-%(n)d \ - --death-timeout %(death_timeout)s \ - %(extra)s -""".lstrip() - - if interface: - host = get_ip_interface(interface) - extra += ' --interface %s ' % interface - else: - host = socket.gethostname() + #Instantiate args and parameters from parent abstract class + super(PBSCluster, self).__init__(**kwargs) + + #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') - if not project: - raise ValueError("Must specify a project like `project='UCLB1234' " - "or set PBS_ACCOUNT environment variable") - self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) - memory = memory.replace(' ', '') - self.config = {'name': name, - 'queue': queue, - 'project': project, - 'threads_per_worker': threads_per_worker, - 'processes': processes, - 'walltime': walltime, - 'scheduler': self.scheduler.address, - 'resource_spec': resource_spec, - 'base_path': dirname, - 'memory': memory, - 'death_timeout': death_timeout, - 'extra': extra} - self.jobs = dict() - self.n = 0 - self._adaptive = None - self._submitcmd = 'qsub' - self._cancelcmd = 'qdel' + + #PBS header build + if self.name is not None: + header_lines = ['#PBS -N %s' % self.name] + if queue is not None: + header_lines.append('#PBS -q %s' % queue) + if project is not None: + header_lines.append('#PBS -A %s' % project) + if resource_spec is None: + #Compute default resources specifications + ncpus = self.worker_processes * self.worker_threads + memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes) + resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string) + logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec) + header_lines.append('#PBS -l %s' % resource_spec) + if walltime is not None: + header_lines.append('#PBS -l walltime=%s' % walltime) + header_lines.extend(['#PBS %s' % arg for arg in job_extra]) + + #Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + + +def pbs_format_bytes_ceil(n): + """ Format bytes as text + PBS expects KiB, MiB or Gib, but names it KB, MB, GB + Whereas Dask makes the difference between KB and KiB + + >>> pbs_format_bytes_ceil(1) + '1B' + >>> pbs_format_bytes_ceil(1234) + '1234B' + >>> pbs_format_bytes_ceil(12345678) + '13MB' + >>> pbs_format_bytes_ceil(1234567890) + '1177MB' + >>> pbs_format_bytes_ceil(15000000000) + '14GB' + """ + if n >= 10 * (1024**3): + return '%dGB' % math.ceil(n / (1024**3)) + if n >= 10 * (1024**2): + return '%dMB' % (n / (1024**2)) + if n >= 10 * 1024: + return '%dkB' % (n / 1024) + return '%dB' % n diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 8828239d..5c3c0c87 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,18 +1,15 @@ import logging import os -import socket import sys -from distributed import LocalCluster -from distributed.utils import get_ip_interface - -from .core import JobQueueCluster +from .core import JobQueueCluster, docstrings logger = logging.getLogger(__name__) dirname = os.path.dirname(sys.executable) +@docstrings.with_indent(4) class SLURMCluster(JobQueueCluster): """ Launch Dask on a SLURM cluster @@ -30,32 +27,28 @@ class SLURMCluster(JobQueueCluster): >>> cluster.adapt() """ + + #Override class variables + submit_command = 'sbatch' + cancel_command = 'scancel' + def __init__(self, - name='dask', queue='', project=None, - threads_per_worker=4, processes=8, memory='7GB', walltime='00:30:00', - interface=None, - death_timeout=60, - extra='', **kwargs): """ Initialize a SLURM Cluster Parameters ---------- - name : str - Name of worker jobs. Passed to `#SBATCH -J` option. queue : str Destination queue for each worker job. Passed to `#SBATCH -p` option. project : str Accounting string associated with each worker job. Passed to `#SBATCH -A` option. - threads_per_worker : int - Number of threads per process. processes : int Number of processes per node. memory : str @@ -63,18 +56,12 @@ def __init__(self, like "7GB" that can be interpretted both by PBS and Dask. walltime : str Walltime for each worker job. - interface : str - Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers - extra : str - Additional arguments to pass to `dask-worker` - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + %(JobQueueCluster.parameters)s """ - self._template = """ -#!/bin/bash + super(SLURMCluster, self).__init__(processes=processes, **kwargs) + + self._header_template = """ #SBATCH -J %(name)s #SBATCH -n %(processes)d #SBATCH -p %(queue)s @@ -86,43 +73,18 @@ def __init__(self, export LANG="en_US.utf8" export LANGUAGE="en_US.utf8" export LC_ALL="en_US.utf8" - -%(base_path)s/dask-worker %(scheduler)s \ - --nthreads %(threads_per_worker)d \ - --nprocs %(processes)s \ - --memory-limit %(memory)s \ - --name %(name)s-%(n)d \ - --death-timeout %(death_timeout)s \ - %(extra)s """.lstrip() - if interface: - host = get_ip_interface(interface) - extra += ' --interface %s ' % interface - else: - host = socket.gethostname() - - project = project or os.environ.get('SLURM_ACCOUNT') - if not project: - raise ValueError("Must specify a project like `project='UCLB1234' " - "or set SLURM_ACCOUNT environment variable") - self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs) memory = memory.replace(' ', '') - self.config = {'name': name, + self.config = {'name': self.name, 'queue': queue, 'project': project, - 'threads_per_worker': threads_per_worker, 'processes': processes, - 'scheduler': self.scheduler.address, 'walltime': walltime, - 'base_path': dirname, - 'memory': memory, - 'death_timeout': death_timeout, - 'extra': extra} - self.jobs = dict() - self.n = 0 - self._adaptive = None - self._submitcmd = 'sbatch' - self._cancelcmd = 'scancel' + # Not used + 'memory': memory + } + + self.job_header = self._header_template % self.config logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index dfe65df4..cde12568 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -5,11 +5,11 @@ from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -from pangeo import PBSCluster +from dask_jobqueue import PBSCluster def test_basic(loop): - with PBSCluster(walltime='00:02:00', threads_per_worker=2, memory='7GB', + with PBSCluster(walltime='00:02:00', threads=2, memory='7GB', interface='ib0', loop=loop) as cluster: with Client(cluster) as client: workers = cluster.start_workers(2) diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 6d542782..629da7b9 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -5,12 +5,12 @@ from dask.distributed import Client from distributed.utils_test import loop # noqa: F401 -from pangeo import SLURMCluster +from dask_jobqueue import SLURMCluster def test_basic(loop): - with SLURMCluster(walltime='00:02:00', threads_per_worker=2, memory='7GB', - loop=loop) as cluster: + with SLURMCluster(walltime='00:02:00', threads=2, memory='7GB', + loop=loop) as cluster: with Client(cluster) as client: workers = cluster.start_workers(2) future = client.submit(lambda x: x + 1, 10) @@ -42,8 +42,7 @@ def test_adaptive(loop): assert cluster.jobs start = time() - while len(client.scheduler_info()['workers']) \ - != cluster.config['processes']: + while len(client.scheduler_info()['workers']) != cluster.config['processes']: sleep(0.1) assert time() < start + 10 diff --git a/setup.py b/setup.py index b82a3938..c5234520 100755 --- a/setup.py +++ b/setup.py @@ -10,4 +10,5 @@ license='BSD 3-Clause', packages=['dask_jobqueue'], long_description=(open('README.rst').read() if exists('README.rst') else ''), + install_requires=['docrep'], zip_safe=False)