Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _job_id_from_submit_output(self, out):
job_id = match.groupdict().get('job_id')
if job_id is None:
msg = ("You need to use a 'job_id' named group in your regexp, e.g. "
"r'(?P<job_id>\d+)', in your regexp. Your regexp was: "
"r'(?P<job_id>\\d+)', in your regexp. Your regexp was: "
"{!r}".format(self.job_id_regexp))
raise ValueError(msg)

Expand Down
86 changes: 55 additions & 31 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@


class LSFCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster
__doc__ = docstrings.with_indents(
""" Launch Dask on a LSF cluster

Parameters
----------
Expand All @@ -22,8 +23,11 @@ class LSFCluster(JobQueueCluster):
`#BSUB -P` option.
ncpus : int
Number of cpus. Passed to `#BSUB -n` option.
ncpus_per_host : int
Indicates the number of processors on each host that should be allocated to the job.
Passed to `-R "span[ptile=ncpus_per_host]"`.
mem : int
Request memory in bytes. Passed to `#BSUB -M` option.
Request memory per job in bytes. Passed to `#BSUB -M` option.
walltime : str
Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option.
job_extra : list
Expand All @@ -45,68 +49,88 @@ class LSFCluster(JobQueueCluster):
kill workers based on load.

>>> cluster.adapt()
""", 4)
""",
4,
)

# Override class variables
submit_command = 'bsub <'
cancel_command = 'bkill'
scheduler_name = 'lsf'

def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs):
submit_command = "bsub <"
cancel_command = "bkill"
scheduler_name = "lsf"

def __init__(
self,
queue=None,
project=None,
ncpus=None,
ncpus_per_host=None,
mem=None,
walltime=None,
job_extra=None,
**kwargs,
):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
queue = dask.config.get("jobqueue.%s.queue" % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
project = dask.config.get("jobqueue.%s.project" % self.scheduler_name)
if ncpus is None:
ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name)
ncpus = dask.config.get("jobqueue.%s.ncpus" % self.scheduler_name)
if mem is None:
mem = dask.config.get('jobqueue.%s.mem' % self.scheduler_name)
mem = dask.config.get("jobqueue.%s.mem" % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.scheduler_name)

# Instantiate args and parameters from parent abstract class
super(LSFCluster, self).__init__(**kwargs)

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append("#BSUB -J %s" % self.name)
if self.log_directory is not None:
header_lines.append('#BSUB -e %s/%s-%%J.err' %
(self.log_directory, self.name or 'worker'))
header_lines.append('#BSUB -o %s/%s-%%J.out' %
(self.log_directory, self.name or 'worker'))
header_lines.append(
"#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker")
)
header_lines.append(
"#BSUB -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker")
)
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
header_lines.append("#BSUB -q %s" % queue)
if project is not None:
header_lines.append('#BSUB -P %s' % project)
header_lines.append("#BSUB -P %s" % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, initializing it to %s" % ncpus)
logger.info(
"ncpus specification for LSF not set, initializing it to %s" % ncpus
)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
header_lines.append("#BSUB -n %s" % ncpus)
if ncpus_per_host is not None:
header_lines.append(f'#BSUB -R "span[ptile={ncpus_per_host}]"')
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, initializing it to %s" % mem)
logger.info(
"mem specification for LSF not set, initializing it to %s" % mem
)
if mem is not None:
memory_string = lsf_format_bytes_ceil(mem)
header_lines.append('#BSUB -M %s' % memory_string)
header_lines.append("#BSUB -M %s" % memory_string)
if walltime is not None:
header_lines.append('#BSUB -W %s' % walltime)
header_lines.extend(['#BSUB %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${LSB_JOBID%.*}')
header_lines.append("#BSUB -W %s" % walltime)
header_lines.extend(["#BSUB %s" % arg for arg in job_extra])
header_lines.append("JOB_ID=${LSB_JOBID%.*}")

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)
self.job_header = "\n".join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

def _submit_job(self, script_filename):
piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null']
piped_cmd = [self.submit_command + " " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)


Expand All @@ -125,4 +149,4 @@ def lsf_format_bytes_ceil(n):
>>> lsf_format_bytes_ceil(1234567890)
'1235'
"""
return '%d' % math.ceil(n / (1000**2))
return "%d" % math.ceil(n / (1000 ** 2))
82 changes: 50 additions & 32 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@


class PBSCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a PBS cluster
__doc__ = docstrings.with_indents(
""" Launch Dask on a PBS cluster

Parameters
----------
Expand Down Expand Up @@ -48,57 +49,74 @@ class PBSCluster(JobQueueCluster):
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS',
... local_directory='$TMPDIR',
... cores=24, processes=6, memory='100GB')
""", 4)
""",
4,
)

# Override class variables
submit_command = 'qsub'
cancel_command = 'qdel'
scheduler_name = 'pbs'

def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, **kwargs):
submit_command = "qsub"
cancel_command = "qdel"
scheduler_name = "pbs"

def __init__(
self,
queue=None,
project=None,
resource_spec=None,
walltime=None,
job_extra=None,
**kwargs
):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
queue = dask.config.get("jobqueue.%s.queue" % self.scheduler_name)
if resource_spec is None:
resource_spec = dask.config.get('jobqueue.%s.resource-spec' % self.scheduler_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.scheduler_name
)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) or os.environ.get('PBS_ACCOUNT')
project = dask.config.get(
"jobqueue.%s.project" % self.scheduler_name
) or os.environ.get("PBS_ACCOUNT")

# Instantiate args and parameters from parent abstract class
super(PBSCluster, self).__init__(**kwargs)

# Try to find a project name from environment variable
project = project or os.environ.get('PBS_ACCOUNT')
project = project or os.environ.get("PBS_ACCOUNT")

header_lines = ['#!/usr/bin/env bash']
header_lines = ["#!/usr/bin/env bash"]
# PBS header build
if self.name is not None:
header_lines.append('#PBS -N %s' % self.name)
header_lines.append("#PBS -N %s" % self.name)
if queue is not None:
header_lines.append('#PBS -q %s' % queue)
header_lines.append("#PBS -q %s" % queue)
if project is not None:
header_lines.append('#PBS -A %s' % project)
header_lines.append("#PBS -A %s" % project)
if resource_spec is None:
# Compute default resources specifications
resource_spec = "select=1:ncpus=%d" % self.worker_cores
memory_string = pbs_format_bytes_ceil(self.worker_memory)
resource_spec += ':mem=' + memory_string
logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec)
resource_spec += ":mem=" + memory_string
logger.info(
"Resource specification for PBS not set, initializing it to %s"
% resource_spec
)
if resource_spec is not None:
header_lines.append('#PBS -l %s' % resource_spec)
header_lines.append("#PBS -l %s" % resource_spec)
if walltime is not None:
header_lines.append('#PBS -l walltime=%s' % walltime)
header_lines.append("#PBS -l walltime=%s" % walltime)
if self.log_directory is not None:
header_lines.append('#PBS -e %s/' % self.log_directory)
header_lines.append('#PBS -o %s/' % self.log_directory)
header_lines.extend(['#PBS %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${PBS_JOBID%.*}')
header_lines.append("#PBS -e %s/" % self.log_directory)
header_lines.append("#PBS -o %s/" % self.log_directory)
header_lines.extend(["#PBS %s" % arg for arg in job_extra])
header_lines.append("JOB_ID=${PBS_JOBID%.*}")

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)
self.job_header = "\n".join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

Expand All @@ -119,10 +137,10 @@ def pbs_format_bytes_ceil(n):
>>> pbs_format_bytes_ceil(15000000000)
'14GB'
"""
if n >= 10 * (1024**3):
return '%dGB' % math.ceil(n / (1024**3))
if n >= 10 * (1024**2):
return '%dMB' % math.ceil(n / (1024**2))
if n >= 10 * (1024 ** 3):
return "%dGB" % math.ceil(n / (1024 ** 3))
if n >= 10 * (1024 ** 2):
return "%dMB" % math.ceil(n / (1024 ** 2))
if n >= 10 * 1024:
return '%dkB' % math.ceil(n / 1024)
return '%dB' % n
return "%dkB" % math.ceil(n / 1024)
return "%dB" % n
Loading