From 2aba50ca72a09a4105f45a79776c02332153b752 Mon Sep 17 00:00:00 2001 From: louisabraham Date: Wed, 17 Oct 2018 12:46:45 +0200 Subject: [PATCH 1/5] black formatting --- dask_jobqueue/pbs.py | 82 +++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index de6b4286..bc558e3d 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -12,7 +12,8 @@ class PBSCluster(JobQueueCluster): - __doc__ = docstrings.with_indents(""" Launch Dask on a PBS cluster + __doc__ = docstrings.with_indents( + """ Launch Dask on a PBS cluster Parameters ---------- @@ -48,57 +49,74 @@ class PBSCluster(JobQueueCluster): >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', ... local_directory='$TMPDIR', ... cores=24, processes=6, memory='100GB') - """, 4) + """, + 4, + ) # Override class variables - submit_command = 'qsub' - cancel_command = 'qdel' - scheduler_name = 'pbs' - - def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, **kwargs): + submit_command = "qsub" + cancel_command = "qdel" + scheduler_name = "pbs" + + def __init__( + self, + queue=None, + project=None, + resource_spec=None, + walltime=None, + job_extra=None, + **kwargs + ): if queue is None: - queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name) + queue = dask.config.get("jobqueue.%s.queue" % self.scheduler_name) if resource_spec is None: - resource_spec = dask.config.get('jobqueue.%s.resource-spec' % self.scheduler_name) + resource_spec = dask.config.get( + "jobqueue.%s.resource-spec" % self.scheduler_name + ) if walltime is None: - walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) + walltime = dask.config.get("jobqueue.%s.walltime" % self.scheduler_name) if job_extra is None: - job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name) + job_extra = dask.config.get("jobqueue.%s.job-extra" % self.scheduler_name) if project is None: - project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) or os.environ.get('PBS_ACCOUNT') + project = dask.config.get( + "jobqueue.%s.project" % self.scheduler_name + ) or os.environ.get("PBS_ACCOUNT") # Instantiate args and parameters from parent abstract class super(PBSCluster, self).__init__(**kwargs) # Try to find a project name from environment variable - project = project or os.environ.get('PBS_ACCOUNT') + project = project or os.environ.get("PBS_ACCOUNT") - header_lines = ['#!/usr/bin/env bash'] + header_lines = ["#!/usr/bin/env bash"] # PBS header build if self.name is not None: - header_lines.append('#PBS -N %s' % self.name) + header_lines.append("#PBS -N %s" % self.name) if queue is not None: - header_lines.append('#PBS -q %s' % queue) + header_lines.append("#PBS -q %s" % queue) if project is not None: - header_lines.append('#PBS -A %s' % project) + header_lines.append("#PBS -A %s" % project) if resource_spec is None: # Compute default resources specifications resource_spec = "select=1:ncpus=%d" % self.worker_cores memory_string = pbs_format_bytes_ceil(self.worker_memory) - resource_spec += ':mem=' + memory_string - logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec) + resource_spec += ":mem=" + memory_string + logger.info( + "Resource specification for PBS not set, initializing it to %s" + % resource_spec + ) if resource_spec is not None: - header_lines.append('#PBS -l %s' % resource_spec) + header_lines.append("#PBS -l %s" % resource_spec) if walltime is not None: - header_lines.append('#PBS -l walltime=%s' % walltime) + header_lines.append("#PBS -l walltime=%s" % walltime) if self.log_directory is not None: - header_lines.append('#PBS -e %s/' % self.log_directory) - header_lines.append('#PBS -o %s/' % self.log_directory) - header_lines.extend(['#PBS %s' % arg for arg in job_extra]) - header_lines.append('JOB_ID=${PBS_JOBID%.*}') + header_lines.append("#PBS -e %s/" % self.log_directory) + header_lines.append("#PBS -o %s/" % self.log_directory) + header_lines.extend(["#PBS %s" % arg for arg in job_extra]) + header_lines.append("JOB_ID=${PBS_JOBID%.*}") # Declare class attribute that shall be overriden - self.job_header = '\n'.join(header_lines) + self.job_header = "\n".join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) @@ -119,10 +137,10 @@ def pbs_format_bytes_ceil(n): >>> pbs_format_bytes_ceil(15000000000) '14GB' """ - if n >= 10 * (1024**3): - return '%dGB' % math.ceil(n / (1024**3)) - if n >= 10 * (1024**2): - return '%dMB' % math.ceil(n / (1024**2)) + if n >= 10 * (1024 ** 3): + return "%dGB" % math.ceil(n / (1024 ** 3)) + if n >= 10 * (1024 ** 2): + return "%dMB" % math.ceil(n / (1024 ** 2)) if n >= 10 * 1024: - return '%dkB' % math.ceil(n / 1024) - return '%dB' % n + return "%dkB" % math.ceil(n / 1024) + return "%dB" % n From fb7f6873dd94acde073bc14b2f8966c918d258a4 Mon Sep 17 00:00:00 2001 From: louisabraham Date: Wed, 17 Oct 2018 12:47:16 +0200 Subject: [PATCH 2/5] black formatting --- dask_jobqueue/lsf.py | 78 +++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 5d508897..4d6a208d 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -11,7 +11,8 @@ class LSFCluster(JobQueueCluster): - __doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster + __doc__ = docstrings.with_indents( + """ Launch Dask on a LSF cluster Parameters ---------- @@ -45,26 +46,37 @@ class LSFCluster(JobQueueCluster): kill workers based on load. >>> cluster.adapt() - """, 4) + """, + 4, + ) # Override class variables - submit_command = 'bsub <' - cancel_command = 'bkill' - scheduler_name = 'lsf' - - def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, **kwargs): + submit_command = "bsub <" + cancel_command = "bkill" + scheduler_name = "lsf" + + def __init__( + self, + queue=None, + project=None, + ncpus=None, + mem=None, + walltime=None, + job_extra=None, + **kwargs + ): if queue is None: - queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name) + queue = dask.config.get("jobqueue.%s.queue" % self.scheduler_name) if project is None: - project = dask.config.get('jobqueue.%s.project' % self.scheduler_name) + project = dask.config.get("jobqueue.%s.project" % self.scheduler_name) if ncpus is None: - ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name) + ncpus = dask.config.get("jobqueue.%s.ncpus" % self.scheduler_name) if mem is None: - mem = dask.config.get('jobqueue.%s.mem' % self.scheduler_name) + mem = dask.config.get("jobqueue.%s.mem" % self.scheduler_name) if walltime is None: - walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name) + walltime = dask.config.get("jobqueue.%s.walltime" % self.scheduler_name) if job_extra is None: - job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name) + job_extra = dask.config.get("jobqueue.%s.job-extra" % self.scheduler_name) # Instantiate args and parameters from parent abstract class super(LSFCluster, self).__init__(**kwargs) @@ -72,41 +84,47 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None header_lines = [] # LSF header build if self.name is not None: - header_lines.append('#BSUB -J %s' % self.name) + header_lines.append("#BSUB -J %s" % self.name) if self.log_directory is not None: - header_lines.append('#BSUB -e %s/%s-%%J.err' % - (self.log_directory, self.name or 'worker')) - header_lines.append('#BSUB -o %s/%s-%%J.out' % - (self.log_directory, self.name or 'worker')) + header_lines.append( + "#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") + ) + header_lines.append( + "#BSUB -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker") + ) if queue is not None: - header_lines.append('#BSUB -q %s' % queue) + header_lines.append("#BSUB -q %s" % queue) if project is not None: - header_lines.append('#BSUB -P %s' % project) + header_lines.append("#BSUB -P %s" % project) if ncpus is None: # Compute default cores specifications ncpus = self.worker_cores - logger.info("ncpus specification for LSF not set, initializing it to %s" % ncpus) + logger.info( + "ncpus specification for LSF not set, initializing it to %s" % ncpus + ) if ncpus is not None: - header_lines.append('#BSUB -n %s' % ncpus) + header_lines.append("#BSUB -n %s" % ncpus) if mem is None: # Compute default memory specifications mem = self.worker_memory - logger.info("mem specification for LSF not set, initializing it to %s" % mem) + logger.info( + "mem specification for LSF not set, initializing it to %s" % mem + ) if mem is not None: memory_string = lsf_format_bytes_ceil(mem) - header_lines.append('#BSUB -M %s' % memory_string) + header_lines.append("#BSUB -M %s" % memory_string) if walltime is not None: - header_lines.append('#BSUB -W %s' % walltime) - header_lines.extend(['#BSUB %s' % arg for arg in job_extra]) - header_lines.append('JOB_ID=${LSB_JOBID%.*}') + header_lines.append("#BSUB -W %s" % walltime) + header_lines.extend(["#BSUB %s" % arg for arg in job_extra]) + header_lines.append("JOB_ID=${LSB_JOBID%.*}") # Declare class attribute that shall be overriden - self.job_header = '\n'.join(header_lines) + self.job_header = "\n".join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) def _submit_job(self, script_filename): - piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null'] + piped_cmd = [self.submit_command + " " + script_filename + " 2> /dev/null"] return self._call(piped_cmd, shell=True) @@ -125,4 +143,4 @@ def lsf_format_bytes_ceil(n): >>> lsf_format_bytes_ceil(1234567890) '1235' """ - return '%d' % math.ceil(n / (1000**2)) + return "%d" % math.ceil(n / (1000 ** 2)) From 409915a7fa456299ff403b15ca06d1b5e27f0c1e Mon Sep 17 00:00:00 2001 From: louisabraham Date: Wed, 17 Oct 2018 13:16:36 +0200 Subject: [PATCH 3/5] black format --- dask_jobqueue/slurm.py | 84 +++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index dfa965a7..a989803d 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -11,7 +11,8 @@ class SLURMCluster(JobQueueCluster): - __doc__ = docstrings.with_indents(""" Launch Dask on a SLURM cluster + __doc__ = docstrings.with_indents( + """ Launch Dask on a SLURM cluster Parameters ---------- @@ -45,63 +46,78 @@ class SLURMCluster(JobQueueCluster): This also works with adaptive clusters. This automatically launches and kill workers based on load. >>> cluster.adapt() - """, 4) + """, + 4, + ) # Override class variables - submit_command = 'sbatch --parsable' - cancel_command = 'scancel' - scheduler_name = 'slurm' - - def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_mem=None, job_extra=None, **kwargs): + submit_command = "sbatch --parsable" + cancel_command = "scancel" + scheduler_name = "slurm" + + def __init__( + self, + queue=None, + project=None, + walltime=None, + job_cpu=None, + job_mem=None, + job_extra=None, + **kwargs + ): if queue is None: - queue = dask.config.get('jobqueue.slurm.queue') + queue = dask.config.get("jobqueue.slurm.queue") if project is None: - project = dask.config.get('jobqueue.slurm.project') + project = dask.config.get("jobqueue.slurm.project") if walltime is None: - walltime = dask.config.get('jobqueue.slurm.walltime') + walltime = dask.config.get("jobqueue.slurm.walltime") if job_cpu is None: - job_cpu = dask.config.get('jobqueue.slurm.job-cpu') + job_cpu = dask.config.get("jobqueue.slurm.job-cpu") if job_mem is None: - job_mem = dask.config.get('jobqueue.slurm.job-mem') + job_mem = dask.config.get("jobqueue.slurm.job-mem") if job_extra is None: - job_extra = dask.config.get('jobqueue.slurm.job-extra') + job_extra = dask.config.get("jobqueue.slurm.job-extra") super(SLURMCluster, self).__init__(**kwargs) # Always ask for only one task - header_lines = ['#!/usr/bin/env bash'] + header_lines = ["#!/usr/bin/env bash"] # SLURM header build if self.name is not None: - header_lines.append('#SBATCH -J %s' % self.name) + header_lines.append("#SBATCH -J %s" % self.name) if self.log_directory is not None: - header_lines.append('#SBATCH -e %s/%s-%%J.err' % - (self.log_directory, self.name or 'worker')) - header_lines.append('#SBATCH -o %s/%s-%%J.out' % - (self.log_directory, self.name or 'worker')) + header_lines.append( + "#SBATCH -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") + ) + header_lines.append( + "#SBATCH -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker") + ) if queue is not None: - header_lines.append('#SBATCH -p %s' % queue) + header_lines.append("#SBATCH -p %s" % queue) if project is not None: - header_lines.append('#SBATCH -A %s' % project) + header_lines.append("#SBATCH -A %s" % project) # Init resources, always 1 task, # and then number of cpu is processes * threads if not set - header_lines.append('#SBATCH -n 1') - header_lines.append('#SBATCH --cpus-per-task=%d' % (job_cpu or self.worker_cores)) + header_lines.append("#SBATCH -n 1") + header_lines.append( + "#SBATCH --cpus-per-task=%d" % (job_cpu or self.worker_cores) + ) # Memory memory = job_mem if job_mem is None: memory = slurm_format_bytes_ceil(self.worker_memory) if memory is not None: - header_lines.append('#SBATCH --mem=%s' % memory) + header_lines.append("#SBATCH --mem=%s" % memory) if walltime is not None: - header_lines.append('#SBATCH -t %s' % walltime) - header_lines.extend(['#SBATCH %s' % arg for arg in job_extra]) + header_lines.append("#SBATCH -t %s" % walltime) + header_lines.extend(["#SBATCH %s" % arg for arg in job_extra]) - header_lines.append('JOB_ID=${SLURM_JOB_ID%;*}') + header_lines.append("JOB_ID=${SLURM_JOB_ID%;*}") # Declare class attribute that shall be overriden - self.job_header = '\n'.join(header_lines) + self.job_header = "\n".join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) @@ -122,10 +138,10 @@ def slurm_format_bytes_ceil(n): >>> slurm_format_bytes_ceil(15000000000) '14G' """ - if n >= (1024**3): - return '%dG' % math.ceil(n / (1024**3)) - if n >= (1024**2): - return '%dM' % math.ceil(n / (1024**2)) + if n >= (1024 ** 3): + return "%dG" % math.ceil(n / (1024 ** 3)) + if n >= (1024 ** 2): + return "%dM" % math.ceil(n / (1024 ** 2)) if n >= 1024: - return '%dK' % math.ceil(n / 1024) - return '1K' % n + return "%dK" % math.ceil(n / 1024) + return "1K" % n From 5dc1f465fdd459690129e735f7153076ec30832f Mon Sep 17 00:00:00 2001 From: louisabraham Date: Wed, 17 Oct 2018 13:41:13 +0200 Subject: [PATCH 4/5] escape backslash --- dask_jobqueue/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f81a4480..8e8b0d6e 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -484,7 +484,7 @@ def _job_id_from_submit_output(self, out): job_id = match.groupdict().get('job_id') if job_id is None: msg = ("You need to use a 'job_id' named group in your regexp, e.g. " - "r'(?P\d+)', in your regexp. Your regexp was: " + "r'(?P\\d+)', in your regexp. Your regexp was: " "{!r}".format(self.job_id_regexp)) raise ValueError(msg) From 54ce28f5ceb1681b63b346d005b7c7a86f824542 Mon Sep 17 00:00:00 2001 From: louisabraham Date: Wed, 17 Oct 2018 13:45:32 +0200 Subject: [PATCH 5/5] add ncpus_per_host option --- dask_jobqueue/lsf.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 4d6a208d..4b3f3258 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -23,8 +23,11 @@ class LSFCluster(JobQueueCluster): `#BSUB -P` option. ncpus : int Number of cpus. Passed to `#BSUB -n` option. + ncpus_per_host : int + Indicates the number of processors on each host that should be allocated to the job. + Passed to `-R "span[ptile=ncpus_per_host]"`. mem : int - Request memory in bytes. Passed to `#BSUB -M` option. + Request memory per job in bytes. Passed to `#BSUB -M` option. walltime : str Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. job_extra : list @@ -60,10 +63,11 @@ def __init__( queue=None, project=None, ncpus=None, + ncpus_per_host=None, mem=None, walltime=None, job_extra=None, - **kwargs + **kwargs, ): if queue is None: queue = dask.config.get("jobqueue.%s.queue" % self.scheduler_name) @@ -104,6 +108,8 @@ def __init__( ) if ncpus is not None: header_lines.append("#BSUB -n %s" % ncpus) + if ncpus_per_host is not None: + header_lines.append(f'#BSUB -R "span[ptile={ncpus_per_host}]"') if mem is None: # Compute default memory specifications mem = self.worker_memory