diff --git a/README.rst b/README.rst index db3aee22..edd5b615 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ Example from dask_jobqueue import PBSCluster - cluster = PBSCluster(project='...') + cluster = PBSCluster(processes=6, threads=4, memory="16GB") cluster.start_workers(10) from dask.distributed import Client diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c1d8f013..7806c937 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -42,6 +42,8 @@ class JobQueueCluster(Cluster): Dask worker local directory for file spilling. extra : str Additional arguments to pass to `dask-worker` + env_extra : list + Other commands to add to script before launching worker. kwargs : dict Additional keyword arguments to pass to `LocalCluster` @@ -63,6 +65,8 @@ class JobQueueCluster(Cluster): %(job_header)s +%(env_header)s + %(worker_command)s """.lstrip() @@ -72,13 +76,14 @@ class JobQueueCluster(Cluster): def __init__(self, name='dask-worker', - threads=4, - processes=6, - memory='16GB', + threads=2, + processes=4, + memory='8GB', interface=None, death_timeout=60, local_directory=None, extra='', + env_extra=[], **kwargs ): """ @@ -102,11 +107,14 @@ def __init__(self, self.worker_memory = parse_bytes(memory) self.worker_processes = processes self.worker_threads = threads + self.name = name self.jobs = dict() self.n = 0 self._adaptive = None + self._env_header = '\n'.join(env_extra) + #dask-worker command line build self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address) if threads is not None: @@ -128,6 +136,7 @@ def __init__(self, def job_script(self): self.n += 1 return self._script_template % {'job_header': self.job_header, + 'env_header': self._env_header, 'worker_command': self._command_template % {'n': self.n} } diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 6a2ac570..dc39ccf0 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -25,8 +25,6 @@ class PBSCluster(JobQueueCluster): Walltime for each worker job. job_extra : list List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. - local_directory : str - Dask worker local directory for file spilling. %(JobQueueCluster.parameters)s Examples @@ -67,9 +65,10 @@ def __init__(self, #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') + header_lines = [] #PBS header build if self.name is not None: - header_lines = ['#PBS -N %s' % self.name] + header_lines.append('#PBS -N %s' % self.name) if queue is not None: header_lines.append('#PBS -q %s' % queue) if project is not None: @@ -110,7 +109,7 @@ def pbs_format_bytes_ceil(n): if n >= 10 * (1024**3): return '%dGB' % math.ceil(n / (1024**3)) if n >= 10 * (1024**2): - return '%dMB' % (n / (1024**2)) + return '%dMB' % math.ceil(n / (1024**2)) if n >= 10 * 1024: - return '%dkB' % (n / 1024) + return '%dkB' % math.ceil(n / 1024) return '%dB' % n diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 5c3c0c87..256dec5c 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,6 +1,7 @@ import logging import os import sys +import math from .core import JobQueueCluster, docstrings @@ -13,10 +14,29 @@ class SLURMCluster(JobQueueCluster): """ Launch Dask on a SLURM cluster + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#SBATCH -p` option. + project : str + Accounting string associated with each worker job. Passed to + `#SBATCH -A` option. + walltime : str + Walltime for each worker job. + job_cpu : int + Number of cpu to book in SLURM, if None, defaults to worker threads * processes + job_mem : str + Amount of memory to request in SLURM. If None, defaults to worker processes * memory + job_extra : list + List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. + %(JobQueueCluster.parameters)s + Examples -------- >>> from pangeo import SLURMCluster - >>> cluster = SLURMCluster(project='...') + >>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", \ +env_extra=['export LANG="en_US.utf8"', \ +'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"']) >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -33,58 +53,72 @@ class SLURMCluster(JobQueueCluster): cancel_command = 'scancel' def __init__(self, - queue='', + queue=None, project=None, - processes=8, - memory='7GB', walltime='00:30:00', + job_cpu=None, + job_mem=None, + job_extra=[], **kwargs): - """ Initialize a SLURM Cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. - Passed to `#SBATCH -p` option. - project : str - Accounting string associated with each worker job. Passed to - `#SBATCH -A` option. - processes : int - Number of processes per node. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by PBS and Dask. - walltime : str - Walltime for each worker job. - %(JobQueueCluster.parameters)s - """ - - super(SLURMCluster, self).__init__(processes=processes, **kwargs) - - self._header_template = """ -#SBATCH -J %(name)s -#SBATCH -n %(processes)d -#SBATCH -p %(queue)s -#SBATCH -A %(project)s -#SBATCH -t %(walltime)s -#SBATCH -e %(name)s.err -#SBATCH -o %(name)s.out - -export LANG="en_US.utf8" -export LANGUAGE="en_US.utf8" -export LC_ALL="en_US.utf8" -""".lstrip() - - memory = memory.replace(' ', '') - self.config = {'name': self.name, - 'queue': queue, - 'project': project, - 'processes': processes, - 'walltime': walltime, - # Not used - 'memory': memory - } - - self.job_header = self._header_template % self.config + + super(SLURMCluster, self).__init__(**kwargs) + + #Always ask for only one task + header_lines = [] + #SLURM header build + if self.name is not None: + header_lines.append('#SBATCH -J %s' % self.name) + header_lines.append('#SBATCH -e %s.err' % self.name) + header_lines.append('#SBATCH -o %s.out' % self.name) + if queue is not None: + header_lines.append('#SBATCH -p %s' % queue) + if project is not None: + header_lines.append('#SBATCH -A %s' % project) + + #Init resources, always 1 task, + # and then number of cpu is processes * threads if not set + header_lines.append('#SBATCH -n 1') + ncpus = job_cpu + if ncpus is None: + ncpus = self.worker_processes * self.worker_threads + header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus) + #Memory + total_memory = job_mem + if job_mem is None and self.worker_memory is not None: + total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory) + if total_memory is not None: + header_lines.append('#SBATCH --mem=%s' % total_memory) + + if walltime is not None: + header_lines.append('#SBATCH -t %s' % walltime) + header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) + + #Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + + +def slurm_format_bytes_ceil(n): + """ Format bytes as text + SLURM expects KiB, MiB or Gib, but names it KB, MB, GB + SLURM does not handle Bytes, only starts at KB + + >>> slurm_format_bytes_ceil(1) + '1K' + >>> slurm_format_bytes_ceil(1234) + '2K' + >>> slurm_format_bytes_ceil(12345678) + '13M' + >>> slurm_format_bytes_ceil(1234567890) + '2G' + >>> slurm_format_bytes_ceil(15000000000) + '14G' + """ + if n >= (1024**3): + return '%dG' % math.ceil(n / (1024**3)) + if n >= (1024**2): + return '%dM' % math.ceil(n / (1024**2)) + if n >= 1024: + return '%dK' % math.ceil(n / 1024) + return '1K' % n