From 5536bbd945e8801a93d1fcab99a5ff9b3903ca40 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Tue, 27 Mar 2018 23:45:55 +0200 Subject: [PATCH 1/5] first refactor on SLURM, corrections on PBS --- dask_jobqueue/core.py | 1 + dask_jobqueue/pbs.py | 7 ++-- dask_jobqueue/slurm.py | 86 +++++++++++++++++++++++++++--------------- 3 files changed, 60 insertions(+), 34 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c1d8f013..1eb16911 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -102,6 +102,7 @@ def __init__(self, self.worker_memory = parse_bytes(memory) self.worker_processes = processes self.worker_threads = threads + self.name = name self.jobs = dict() self.n = 0 diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 6a2ac570..44bbef32 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -67,9 +67,10 @@ def __init__(self, #Try to find a project name from environment variable project = project or os.environ.get('PBS_ACCOUNT') + header_lines = [] #PBS header build if self.name is not None: - header_lines = ['#PBS -N %s' % self.name] + header_lines.append('#PBS -N %s' % self.name) if queue is not None: header_lines.append('#PBS -q %s' % queue) if project is not None: @@ -110,7 +111,7 @@ def pbs_format_bytes_ceil(n): if n >= 10 * (1024**3): return '%dGB' % math.ceil(n / (1024**3)) if n >= 10 * (1024**2): - return '%dMB' % (n / (1024**2)) + return '%dMB' % math.ceil(n / (1024**2)) if n >= 10 * 1024: - return '%dkB' % (n / 1024) + return '%dkB' % math.ceil(n / 1024) return '%dB' % n diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 5c3c0c87..9021631e 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -1,6 +1,7 @@ import logging import os import sys +import math from .core import JobQueueCluster, docstrings @@ -35,9 +36,9 @@ class SLURMCluster(JobQueueCluster): def __init__(self, queue='', project=None, - processes=8, - memory='7GB', walltime='00:30:00', + job_extra=[], + env_extra=['export LANG="en_US.utf8"', 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"'], **kwargs): """ Initialize a SLURM Cluster @@ -49,8 +50,6 @@ def __init__(self, project : str Accounting string associated with each worker job. Passed to `#SBATCH -A` option. - processes : int - Number of processes per node. memory : str Bytes of memory that the worker can use. This should be a string like "7GB" that can be interpretted both by PBS and Dask. @@ -59,32 +58,57 @@ def __init__(self, %(JobQueueCluster.parameters)s """ - super(SLURMCluster, self).__init__(processes=processes, **kwargs) - - self._header_template = """ -#SBATCH -J %(name)s -#SBATCH -n %(processes)d -#SBATCH -p %(queue)s -#SBATCH -A %(project)s -#SBATCH -t %(walltime)s -#SBATCH -e %(name)s.err -#SBATCH -o %(name)s.out - -export LANG="en_US.utf8" -export LANGUAGE="en_US.utf8" -export LC_ALL="en_US.utf8" -""".lstrip() - - memory = memory.replace(' ', '') - self.config = {'name': self.name, - 'queue': queue, - 'project': project, - 'processes': processes, - 'walltime': walltime, - # Not used - 'memory': memory - } - - self.job_header = self._header_template % self.config + super(SLURMCluster, self).__init__(**kwargs) + + #Always ask for only one task + header_lines = [] + #PBS header build + if self.name is not None: + header_lines.append('#SBATCH -J %s' % self.name) + header_lines.append('#SBATCH -e %s.err' % self.name) + header_lines.append('#SBATCH -o %s.out' % self.name) + if queue is not None: + header_lines.append('#SBATCH -p %s' % queue) + if project is not None: + header_lines.append('#SBATCH -A %s' % project) + + #Init resources, always 1 task, and then number of cpu is processes * threads + header_lines.append('#SBATCH -n 1') + header_lines.append('#SBATCH --cpus-per-task=%d' % self.worker_processes * self.worker_threads) + if self.worker_memory is not None: + total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory) + header_lines.append('#SBATCH --mem=%d' % total_memory) + if walltime is not None: + header_lines.append('#SBATCH -t %s' % walltime) + header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) + header_lines.append('') + header_lines.extend([arg for arg in env_extra]) + + #Declare class attribute that shall be overriden + self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + +def slurm_format_bytes_ceil(n): + """ Format bytes as text + SLURM expects KiB, MiB or Gib, but names it KB, MB, GB + SLURM does not handle Bytes, only starts at KB + + >>> pbs_format_bytes_ceil(1) + '1K' + >>> pbs_format_bytes_ceil(1234) + '2K' + >>> pbs_format_bytes_ceil(12345678) + '13M' + >>> pbs_format_bytes_ceil(1234567890) + '2G' + >>> pbs_format_bytes_ceil(15000000000) + '14G' + """ + if n >= (1024**3): + return '%dG' % math.ceil(n / (1024**3)) + if n >= (1024**2): + return '%dM' % math.ceil(n / (1024**2)) + if n >= 1024: + return '%dK' % math.ceil(n / 1024) + return '1K' % n \ No newline at end of file From 638407cb46482a715f524b7cd92f0ed6a4b85177 Mon Sep 17 00:00:00 2001 From: Guillaume Date: Wed, 28 Mar 2018 00:18:16 +0200 Subject: [PATCH 2/5] Fix slef.name attribute, refactor extra env out of slurm.py, complete slurm refactor --- dask_jobqueue/core.py | 8 ++++++++ dask_jobqueue/pbs.py | 2 -- dask_jobqueue/slurm.py | 46 +++++++++++++++++++----------------------- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 1eb16911..0e8445df 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -42,6 +42,8 @@ class JobQueueCluster(Cluster): Dask worker local directory for file spilling. extra : str Additional arguments to pass to `dask-worker` + env_extra : list + Other commands to add to script before launching worker. kwargs : dict Additional keyword arguments to pass to `LocalCluster` @@ -63,6 +65,8 @@ class JobQueueCluster(Cluster): %(job_header)s +%(env_header)s + %(worker_command)s """.lstrip() @@ -79,6 +83,7 @@ def __init__(self, death_timeout=60, local_directory=None, extra='', + env_extra=[], **kwargs ): """ @@ -108,6 +113,8 @@ def __init__(self, self.n = 0 self._adaptive = None + self._env_header = '\n'.join(env_extra) + #dask-worker command line build self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address) if threads is not None: @@ -129,6 +136,7 @@ def __init__(self, def job_script(self): self.n += 1 return self._script_template % {'job_header': self.job_header, + 'env_header': self._env_header, 'worker_command': self._command_template % {'n': self.n} } diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 44bbef32..dc39ccf0 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -25,8 +25,6 @@ class PBSCluster(JobQueueCluster): Walltime for each worker job. job_extra : list List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. - local_directory : str - Dask worker local directory for file spilling. %(JobQueueCluster.parameters)s Examples diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 9021631e..36df1963 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -14,10 +14,24 @@ class SLURMCluster(JobQueueCluster): """ Launch Dask on a SLURM cluster + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#SBATCH -p` option. + project : str + Accounting string associated with each worker job. Passed to + `#SBATCH -A` option. + walltime : str + Walltime for each worker job. + job_extra : list + List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. + %(JobQueueCluster.parameters)s + Examples -------- >>> from pangeo import SLURMCluster - >>> cluster = SLURMCluster(project='...') + >>> cluster = SLURMCluster(env_extra=['export LANG="en_US.utf8"', \ + 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"']) >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client @@ -34,29 +48,11 @@ class SLURMCluster(JobQueueCluster): cancel_command = 'scancel' def __init__(self, - queue='', + queue=None, project=None, walltime='00:30:00', job_extra=[], - env_extra=['export LANG="en_US.utf8"', 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"'], **kwargs): - """ Initialize a SLURM Cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. - Passed to `#SBATCH -p` option. - project : str - Accounting string associated with each worker job. Passed to - `#SBATCH -A` option. - memory : str - Bytes of memory that the worker can use. This should be a string - like "7GB" that can be interpretted both by PBS and Dask. - walltime : str - Walltime for each worker job. - %(JobQueueCluster.parameters)s - """ super(SLURMCluster, self).__init__(**kwargs) @@ -74,21 +70,21 @@ def __init__(self, #Init resources, always 1 task, and then number of cpu is processes * threads header_lines.append('#SBATCH -n 1') - header_lines.append('#SBATCH --cpus-per-task=%d' % self.worker_processes * self.worker_threads) + ncpus = self.worker_processes * self.worker_threads + header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus) if self.worker_memory is not None: total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory) - header_lines.append('#SBATCH --mem=%d' % total_memory) + header_lines.append('#SBATCH --mem=%s' % total_memory) if walltime is not None: header_lines.append('#SBATCH -t %s' % walltime) header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) - header_lines.append('') - header_lines.extend([arg for arg in env_extra]) #Declare class attribute that shall be overriden self.job_header = '\n'.join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) + def slurm_format_bytes_ceil(n): """ Format bytes as text SLURM expects KiB, MiB or Gib, but names it KB, MB, GB @@ -111,4 +107,4 @@ def slurm_format_bytes_ceil(n): return '%dM' % math.ceil(n / (1024**2)) if n >= 1024: return '%dK' % math.ceil(n / 1024) - return '1K' % n \ No newline at end of file + return '1K' % n From f0bb957d6b350561f3a0998d62a313250df8625b Mon Sep 17 00:00:00 2001 From: Guillaume Eynard-Bontemps Date: Thu, 29 Mar 2018 12:55:57 +0000 Subject: [PATCH 3/5] Removing PBS terms from slurm.py --- dask_jobqueue/slurm.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 36df1963..03654b75 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -58,7 +58,7 @@ def __init__(self, #Always ask for only one task header_lines = [] - #PBS header build + #SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) header_lines.append('#SBATCH -e %s.err' % self.name) @@ -90,15 +90,15 @@ def slurm_format_bytes_ceil(n): SLURM expects KiB, MiB or Gib, but names it KB, MB, GB SLURM does not handle Bytes, only starts at KB - >>> pbs_format_bytes_ceil(1) + >>> slurm_format_bytes_ceil(1) '1K' - >>> pbs_format_bytes_ceil(1234) + >>> slurm_format_bytes_ceil(1234) '2K' - >>> pbs_format_bytes_ceil(12345678) + >>> slurm_format_bytes_ceil(12345678) '13M' - >>> pbs_format_bytes_ceil(1234567890) + >>> slurm_format_bytes_ceil(1234567890) '2G' - >>> pbs_format_bytes_ceil(15000000000) + >>> slurm_format_bytes_ceil(15000000000) '14G' """ if n >= (1024**3): From 68f4fdef46cff6b8f07ace4818b1a340c265897a Mon Sep 17 00:00:00 2001 From: guillaumeeb Date: Sat, 31 Mar 2018 11:38:50 +0200 Subject: [PATCH 4/5] lower default reserved cpus, add example on slurm on how to specify thread and processes --- README.rst | 2 +- dask_jobqueue/core.py | 6 +++--- dask_jobqueue/slurm.py | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index db3aee22..edd5b615 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ Example from dask_jobqueue import PBSCluster - cluster = PBSCluster(project='...') + cluster = PBSCluster(processes=6, threads=4, memory="16GB") cluster.start_workers(10) from dask.distributed import Client diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 0e8445df..7806c937 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -76,9 +76,9 @@ class JobQueueCluster(Cluster): def __init__(self, name='dask-worker', - threads=4, - processes=6, - memory='16GB', + threads=2, + processes=4, + memory='8GB', interface=None, death_timeout=60, local_directory=None, diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 03654b75..1bf21e9d 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -30,8 +30,9 @@ class SLURMCluster(JobQueueCluster): Examples -------- >>> from pangeo import SLURMCluster - >>> cluster = SLURMCluster(env_extra=['export LANG="en_US.utf8"', \ - 'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"']) + >>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", \ +env_extra=['export LANG="en_US.utf8"', \ +'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"']) >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client From 44e24e39370b4393ebb1964cf8e5bc835e23bfc8 Mon Sep 17 00:00:00 2001 From: guillaumeeb Date: Sat, 31 Mar 2018 11:50:50 +0200 Subject: [PATCH 5/5] add possibility to specify ncpus and mem for SLURM SBatch --- dask_jobqueue/slurm.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 1bf21e9d..256dec5c 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -23,6 +23,10 @@ class SLURMCluster(JobQueueCluster): `#SBATCH -A` option. walltime : str Walltime for each worker job. + job_cpu : int + Number of cpu to book in SLURM, if None, defaults to worker threads * processes + job_mem : str + Amount of memory to request in SLURM. If None, defaults to worker processes * memory job_extra : list List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. %(JobQueueCluster.parameters)s @@ -52,6 +56,8 @@ def __init__(self, queue=None, project=None, walltime='00:30:00', + job_cpu=None, + job_mem=None, job_extra=[], **kwargs): @@ -69,13 +75,20 @@ def __init__(self, if project is not None: header_lines.append('#SBATCH -A %s' % project) - #Init resources, always 1 task, and then number of cpu is processes * threads + #Init resources, always 1 task, + # and then number of cpu is processes * threads if not set header_lines.append('#SBATCH -n 1') - ncpus = self.worker_processes * self.worker_threads + ncpus = job_cpu + if ncpus is None: + ncpus = self.worker_processes * self.worker_threads header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus) - if self.worker_memory is not None: + #Memory + total_memory = job_mem + if job_mem is None and self.worker_memory is not None: total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory) + if total_memory is not None: header_lines.append('#SBATCH --mem=%s' % total_memory) + if walltime is not None: header_lines.append('#SBATCH -t %s' % walltime) header_lines.extend(['#SBATCH %s' % arg for arg in job_extra])