Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Example

from dask_jobqueue import PBSCluster

cluster = PBSCluster(project='...')
cluster = PBSCluster(processes=6, threads=4, memory="16GB")
cluster.start_workers(10)

from dask.distributed import Client
Expand Down
15 changes: 12 additions & 3 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class JobQueueCluster(Cluster):
Dask worker local directory for file spilling.
extra : str
Additional arguments to pass to `dask-worker`
env_extra : list
Other commands to add to script before launching worker.
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`

Expand All @@ -63,6 +65,8 @@ class JobQueueCluster(Cluster):

%(job_header)s

%(env_header)s

%(worker_command)s
""".lstrip()

Expand All @@ -72,13 +76,14 @@ class JobQueueCluster(Cluster):

def __init__(self,
name='dask-worker',
threads=4,
processes=6,
memory='16GB',
threads=2,
processes=4,
memory='8GB',
interface=None,
death_timeout=60,
local_directory=None,
extra='',
env_extra=[],
**kwargs
):
"""
Expand All @@ -102,11 +107,14 @@ def __init__(self,
self.worker_memory = parse_bytes(memory)
self.worker_processes = processes
self.worker_threads = threads
self.name = name

self.jobs = dict()
self.n = 0
self._adaptive = None

self._env_header = '\n'.join(env_extra)

#dask-worker command line build
self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address)
if threads is not None:
Expand All @@ -128,6 +136,7 @@ def __init__(self,
def job_script(self):
self.n += 1
return self._script_template % {'job_header': self.job_header,
'env_header': self._env_header,
'worker_command': self._command_template % {'n': self.n}
}

Expand Down
9 changes: 4 additions & 5 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class PBSCluster(JobQueueCluster):
Walltime for each worker job.
job_extra : list
List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.
local_directory : str
Dask worker local directory for file spilling.
%(JobQueueCluster.parameters)s

Examples
Expand Down Expand Up @@ -67,9 +65,10 @@ def __init__(self,
#Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')

header_lines = []
#PBS header build
if self.name is not None:
header_lines = ['#PBS -N %s' % self.name]
header_lines.append('#PBS -N %s' % self.name)
if queue is not None:
header_lines.append('#PBS -q %s' % queue)
if project is not None:
Expand Down Expand Up @@ -110,7 +109,7 @@ def pbs_format_bytes_ceil(n):
if n >= 10 * (1024**3):
return '%dGB' % math.ceil(n / (1024**3))
if n >= 10 * (1024**2):
return '%dMB' % (n / (1024**2))
return '%dMB' % math.ceil(n / (1024**2))
if n >= 10 * 1024:
return '%dkB' % (n / 1024)
return '%dkB' % math.ceil(n / 1024)
return '%dB' % n
136 changes: 85 additions & 51 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import sys
import math

from .core import JobQueueCluster, docstrings

Expand All @@ -13,10 +14,29 @@
class SLURMCluster(JobQueueCluster):
""" Launch Dask on a SLURM cluster

Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#SBATCH -p` option.
project : str
Accounting string associated with each worker job. Passed to
`#SBATCH -A` option.
walltime : str
Walltime for each worker job.
job_cpu : int
Number of cpu to book in SLURM, if None, defaults to worker threads * processes
job_mem : str
Amount of memory to request in SLURM. If None, defaults to worker processes * memory
job_extra : list
List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix.
%(JobQueueCluster.parameters)s

Examples
--------
>>> from pangeo import SLURMCluster
>>> cluster = SLURMCluster(project='...')
>>> cluster = SLURMCluster(processes=6, threads=4, memory="16GB", \
env_extra=['export LANG="en_US.utf8"', \
'export LANGUAGE="en_US.utf8"', 'export LC_ALL="en_US.utf8"'])
>>> cluster.start_workers(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
Expand All @@ -33,58 +53,72 @@ class SLURMCluster(JobQueueCluster):
cancel_command = 'scancel'

def __init__(self,
queue='',
queue=None,
project=None,
processes=8,
memory='7GB',
walltime='00:30:00',
job_cpu=None,
job_mem=None,
job_extra=[],
**kwargs):
""" Initialize a SLURM Cluster

Parameters
----------
queue : str
Destination queue for each worker job.
Passed to `#SBATCH -p` option.
project : str
Accounting string associated with each worker job. Passed to
`#SBATCH -A` option.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by PBS and Dask.
walltime : str
Walltime for each worker job.
%(JobQueueCluster.parameters)s
"""

super(SLURMCluster, self).__init__(processes=processes, **kwargs)

self._header_template = """
#SBATCH -J %(name)s
#SBATCH -n %(processes)d
#SBATCH -p %(queue)s
#SBATCH -A %(project)s
#SBATCH -t %(walltime)s
#SBATCH -e %(name)s.err
#SBATCH -o %(name)s.out

export LANG="en_US.utf8"
export LANGUAGE="en_US.utf8"
export LC_ALL="en_US.utf8"
""".lstrip()

memory = memory.replace(' ', '')
self.config = {'name': self.name,
'queue': queue,
'project': project,
'processes': processes,
'walltime': walltime,
# Not used
'memory': memory
}

self.job_header = self._header_template % self.config

super(SLURMCluster, self).__init__(**kwargs)

#Always ask for only one task
header_lines = []
#SLURM header build
if self.name is not None:
header_lines.append('#SBATCH -J %s' % self.name)
header_lines.append('#SBATCH -e %s.err' % self.name)
header_lines.append('#SBATCH -o %s.out' % self.name)
if queue is not None:
header_lines.append('#SBATCH -p %s' % queue)
if project is not None:
header_lines.append('#SBATCH -A %s' % project)

#Init resources, always 1 task,
# and then number of cpu is processes * threads if not set
header_lines.append('#SBATCH -n 1')
ncpus = job_cpu
if ncpus is None:
ncpus = self.worker_processes * self.worker_threads
header_lines.append('#SBATCH --cpus-per-task=%d' % ncpus)
#Memory
total_memory = job_mem
if job_mem is None and self.worker_memory is not None:
total_memory = slurm_format_bytes_ceil(self.worker_processes * self.worker_memory)
if total_memory is not None:
header_lines.append('#SBATCH --mem=%s' % total_memory)

if walltime is not None:
header_lines.append('#SBATCH -t %s' % walltime)
header_lines.extend(['#SBATCH %s' % arg for arg in job_extra])

#Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())


def slurm_format_bytes_ceil(n):
""" Format bytes as text
SLURM expects KiB, MiB or Gib, but names it KB, MB, GB
SLURM does not handle Bytes, only starts at KB

>>> slurm_format_bytes_ceil(1)
'1K'
>>> slurm_format_bytes_ceil(1234)
'2K'
>>> slurm_format_bytes_ceil(12345678)
'13M'
>>> slurm_format_bytes_ceil(1234567890)
'2G'
>>> slurm_format_bytes_ceil(15000000000)
'14G'
"""
if n >= (1024**3):
return '%dG' % math.ceil(n / (1024**3))
if n >= (1024**2):
return '%dM' % math.ceil(n / (1024**2))
if n >= 1024:
return '%dK' % math.ceil(n / 1024)
return '1K' % n