Skip to content
Merged
118 changes: 111 additions & 7 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,134 @@
from contextlib import contextmanager
import logging
import subprocess
import toolz
import socket
import os
import sys
import docrep

from distributed.utils import tmpfile, ignoring
from distributed.utils import tmpfile, ignoring, get_ip_interface, parse_bytes
from distributed import LocalCluster

dirname = os.path.dirname(sys.executable)

logger = logging.getLogger(__name__)
docstrings = docrep.DocstringProcessor()


@docstrings.get_sectionsf('JobQueueCluster')
class JobQueueCluster(object):
""" Base class to launch Dask Clusters for Job queues

This class should not be used directly, use inherited class appropriate
for your queueing system (e.g. PBScluster or SLURMCluster)

Parameters
----------
name : str
Name of Dask workers.
threads : int
Number of threads per process.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by PBS and Dask.
interface : str
Network interface like 'eth0' or 'ib0'.
death_timeout : float
Seconds to wait for a scheduler before closing workers
local_directory : str
Dask worker local directory for file spilling.
extra : str
Additional arguments to pass to `dask-worker`
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`

Attributes
----------
submit_command: str
Abstract attribute for job scheduler submit command, should be overriden
cancel_command: str
Abstract attribute for job scheduler cancel command, should be overriden

See Also
--------
PBSCluster
SLURMCluster
"""
def __init__(self):
raise NotImplemented

_script_template = """
#!/bin/bash

%(job_header)s

%(worker_command)s
""".lstrip()

#Following class attributes should be overriden by extending classes.
submit_command = None
cancel_command = None

def __init__(self,
name='dask-worker',
threads=4,
processes=6,
memory='16GB',
interface=None,
death_timeout=60,
local_directory=None,
extra='',
**kwargs
):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add these options to the individual cluster doc strings?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get your point, where do you want to put them? They are already in the JobQueueCluster doc strings. You wan to put it in every implementation?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was just that the JobQueueCluster doc string doesn't show up on the PBSCluster doc string so it requires some unnecessary searching to find all the possible arguments to the constructor.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was already some duplicated doc, I updated it.
Does not feel really good about that though, don't like to duplicate docstrings... Is there a better way?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could directly take the class docs and add to them. i.e. in each child class definition you do something like:

        doc = JobQueueCluster.__doc__
        self.__doc__ +=  "specific docs for Slurm cluster"

Perhaps there's a less hacky way though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rabernat - do I recall you introducing the xarray devs to a decorator approach to doc string templating? I can't seem to dig it up but perhaps someone knows what I'm talking about.

"""
This initializer should be considered as Abstract, and never used directly.
"""
if not self.cancel_command or not self.submit_command:
raise NotImplementedError('JobQueueCluster is an abstract class that should not be instanciated.')

#This attribute should be overriden
self.job_header = None

if interface:
host = get_ip_interface(interface)
extra += ' --interface %s ' % interface
else:
host = socket.gethostname()

self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)

#Keep information on process, threads and memory, for use in subclasses
self.worker_memory = parse_bytes(memory)
self.worker_processes = processes
self.worker_threads = threads

self.jobs = dict()
self.n = 0
self._adaptive = None

#dask-worker command line build
self._command_template = os.path.join(dirname, 'dask-worker %s' % self.scheduler.address)
if threads is not None:
self._command_template += " --nthreads %d" % threads
if processes is not None:
self._command_template += " --nprocs %d" % processes
if memory is not None:
self._command_template += " --memory-limit %s" % memory
if name is not None:
self._command_template += " --name %s" % name
self._command_template += "-%(n)d" #Keep %(n) to be replaced later.
if death_timeout is not None:
self._command_template += " --death-timeout %s" % death_timeout
if local_directory is not None:
self._command_template += " --local-directory %s" % local_directory
if extra is not None:
self._command_template += extra

def job_script(self):
self.n += 1
return self._template % toolz.merge(self.config, {'n': self.n})
return self._script_template % {'job_header': self.job_header,
'worker_command': self._command_template % {'n': self.n}
}

@contextmanager
def job_file(self):
Expand All @@ -39,7 +143,7 @@ def start_workers(self, n=1):
workers = []
for _ in range(n):
with self.job_file() as fn:
out = self._call([self._submitcmd, fn])
out = self._call([self.submit_command, fn])
job = out.decode().split('.')[0]
self.jobs[self.n] = job
workers.append(self.n)
Expand Down Expand Up @@ -99,7 +203,7 @@ def stop_workers(self, workers):
return
workers = list(map(int, workers))
jobs = [self.jobs[w] for w in workers]
self._call([self._cancelcmd] + list(jobs))
self._call([self.cancel_command] + list(jobs))
for w in workers:
with ignoring(KeyError):
del self.jobs[w]
Expand Down
153 changes: 72 additions & 81 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
import logging
import os
import socket
import sys
import math

from distributed import LocalCluster
from distributed.utils import get_ip_interface

from .core import JobQueueCluster
from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)

dirname = os.path.dirname(sys.executable)


@docstrings.with_indent(4)
class PBSCluster(JobQueueCluster):
""" Launch Dask on a PBS cluster

Parameters
----------
name : str
Name of worker jobs. Passed to `$PBS -N` option.
queue : str
Destination queue for each worker job. Passed to `#PBS -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#PBS -A` option.
threads_per_worker : int
Number of threads per process.
processes : int
Number of processes per node.
memory : str
Bytes of memory that the worker can use. This should be a string
like "7GB" that can be interpretted both by PBS and Dask.
resource_spec : str
Request resources and specify job placement. Passed to `#PBS -l`
option.
walltime : str
Walltime for each worker job.
interface : str
Network interface like 'eth0' or 'ib0'.
death_timeout : float
Seconds to wait for a scheduler before closing workers
extra : str
Additional arguments to pass to `dask-worker`
kwargs : dict
Additional keyword arguments to pass to `LocalCluster`
job_extra : list
List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.
local_directory : str
Dask worker local directory for file spilling.
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue import PBSCluster
>>> cluster = PBSCluster(project='...')
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS')
>>> cluster.start_workers(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
Expand All @@ -59,67 +42,75 @@ class PBSCluster(JobQueueCluster):
kill workers based on load.

>>> cluster.adapt()

It is a good practice to define local_directory to your PBS system scratch directory,
and you should specify resource_spec according to the processes and threads asked:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',local_directory=os.getenv('TMPDIR', '/tmp'), \
threads=4, processes=6, memory='16GB', resource_spec='select=1:ncpus=24:mem=100GB')
"""

#Override class variables
submit_command = 'qsub'
cancel_command = 'qdel'

def __init__(self,
name='dask',
queue='regular',
queue=None,
project=None,
resource_spec='select=1:ncpus=36:mem=109GB',
threads_per_worker=4,
processes=9,
memory='7GB',
resource_spec=None,
walltime='00:30:00',
interface=None,
death_timeout=60,
extra='',
job_extra=[],
**kwargs):
self._template = """
#!/bin/bash

#PBS -N %(name)s
#PBS -q %(queue)s
#PBS -A %(project)s
#PBS -l %(resource_spec)s
#PBS -l walltime=%(walltime)s
#PBS -j oe

%(base_path)s/dask-worker %(scheduler)s \
--nthreads %(threads_per_worker)d \
--nprocs %(processes)s \
--memory-limit %(memory)s \
--name %(name)s-%(n)d \
--death-timeout %(death_timeout)s \
%(extra)s
""".lstrip()

if interface:
host = get_ip_interface(interface)
extra += ' --interface %s ' % interface
else:
host = socket.gethostname()

#Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

#Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')
if not project:
raise ValueError("Must specify a project like `project='UCLB1234' "
"or set PBS_ACCOUNT environment variable")
self.cluster = LocalCluster(n_workers=0, ip=host, **kwargs)
memory = memory.replace(' ', '')
self.config = {'name': name,
'queue': queue,
'project': project,
'threads_per_worker': threads_per_worker,
'processes': processes,
'walltime': walltime,
'scheduler': self.scheduler.address,
'resource_spec': resource_spec,
'base_path': dirname,
'memory': memory,
'death_timeout': death_timeout,
'extra': extra}
self.jobs = dict()
self.n = 0
self._adaptive = None
self._submitcmd = 'qsub'
self._cancelcmd = 'qdel'

#PBS header build
if self.name is not None:
header_lines = ['#PBS -N %s' % self.name]
if queue is not None:
header_lines.append('#PBS -q %s' % queue)
if project is not None:
header_lines.append('#PBS -A %s' % project)
if resource_spec is None:
#Compute default resources specifications
ncpus = self.worker_processes * self.worker_threads
memory_string = pbs_format_bytes_ceil(self.worker_memory * self.worker_processes)
resource_spec = "select=1:ncpus=%d:mem=%s" % (ncpus, memory_string)
logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec)
header_lines.append('#PBS -l %s' % resource_spec)
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])

#Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())


def pbs_format_bytes_ceil(n):
""" Format bytes as text
PBS expects KiB, MiB or Gib, but names it KB, MB, GB
Whereas Dask makes the difference between KB and KiB

>>> pbs_format_bytes_ceil(1)
'1B'
>>> pbs_format_bytes_ceil(1234)
'1234B'
>>> pbs_format_bytes_ceil(12345678)
'13MB'
>>> pbs_format_bytes_ceil(1234567890)
'1177MB'
>>> pbs_format_bytes_ceil(15000000000)
'14GB'
"""
if n >= 10 * (1024**3):
return '%dGB' % math.ceil(n / (1024**3))
if n >= 10 * (1024**2):
return '%dMB' % (n / (1024**2))
if n >= 10 * 1024:
return '%dkB' % (n / 1024)
return '%dB' % n
Loading