From e10cede8f84eeff5ef2f56deecedd9e5e98e1a79 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 00:37:32 -0600 Subject: [PATCH 01/22] jinja template files --- dask_jobqueue/templates/command | 22 ++++++++++++++++++++++ dask_jobqueue/templates/pbs_job_header | 23 +++++++++++++++++++++++ dask_jobqueue/templates/script.sh | 5 +++++ setup.py | 1 + 4 files changed, 51 insertions(+) create mode 100644 dask_jobqueue/templates/command create mode 100644 dask_jobqueue/templates/pbs_job_header create mode 100644 dask_jobqueue/templates/script.sh diff --git a/dask_jobqueue/templates/command b/dask_jobqueue/templates/command new file mode 100644 index 00000000..8edd479d --- /dev/null +++ b/dask_jobqueue/templates/command @@ -0,0 +1,22 @@ +{{python}} -m distributed.cli.dask_worker {{scheduler}} --nthreads {{threads}}{{" "}} +{%- if processes is not none and processes > 1 -%} +--nprocs {{processes}}{{" "}} +{%- endif -%} +--memory-limit {{memory}} --name {{name}}{{" "}} +{%- if nanny -%} +--nanny +{%- else -%} +--no-nanny +{%- endif -%}{{" "}} +{%- if death_timeout is not none -%} +--death-timeout {{ death_timeout }}{{" "}} +{%- endif -%} +{%- if local_directory is not none -%} +--local-directory {{ local_directory }}{{" "}} +{%- endif -%} +{%- if extra is not none -%} +{{ extra | join(" ") }}{{" "}} +{%- endif -%} +{%- if interface -%} +--interface {{ interface }}{{" "}} +{%- endif -%} diff --git a/dask_jobqueue/templates/pbs_job_header b/dask_jobqueue/templates/pbs_job_header new file mode 100644 index 00000000..8373ea85 --- /dev/null +++ b/dask_jobqueue/templates/pbs_job_header @@ -0,0 +1,23 @@ +{% if job_name is not none -%} +#PBS -N {{job_name}} +{%- endif %} +{% if queue is not none -%} +#PBS -q {{queue}} +{%- endif %} +{% if project is not none -%} +#PBS -A {{project}} +{%- endif %} +#PBS -l {{resource_spec | pbs_format_resource_spec(worker_cores, worker_memory) }} +{% if walltime is not none -%} +#PBS -l walltime={{walltime}} +{%- endif %} +{% if log_directory is not none -%} +#PBS -o {{log_directory}}/ +#PBS -e {{log_directory}}/ +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#PBS {{j}} +{%- endfor -%} +{%- endif %} +JOB_ID=${PBS_JOBID%%.*} \ No newline at end of file diff --git a/dask_jobqueue/templates/script.sh b/dask_jobqueue/templates/script.sh new file mode 100644 index 00000000..61277c4f --- /dev/null +++ b/dask_jobqueue/templates/script.sh @@ -0,0 +1,5 @@ +{{shebang}} + +{{job_header}} +{{env_header}} +{{worker_command}} diff --git a/setup.py b/setup.py index e5966511..dfcf3188 100755 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ python_requires=">=3.6", license="BSD 3-Clause", packages=["dask_jobqueue"], + package_data={'dask_jobqueue': ['templates/*']}, include_package_data=True, install_requires=install_requires, tests_require=["pytest >= 2.7.1"], From 41bdccbc9b0e764a9a45de654802e4dd0febc155 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:08:29 -0600 Subject: [PATCH 02/22] Resolve merge conflict by accepting both changes --- dask_jobqueue/core.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f546a1cc..8510cefe 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -9,6 +9,8 @@ import weakref import abc +from jinja2 import Environment, PackageLoader + import dask from distributed.core import Status @@ -236,27 +238,19 @@ def __init__( self._env_header = "\n".join(filter(None, env_extra)) self.header_skip = set(header_skip) - # dask-worker command line build - dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict( - python=python + self._command_template = self.template_env.get_template('command').render( + python=python, + scheduler=self.scheduler, + threads=self.worker_process_threads, + processes=processes, + memory=self.worker_process_memory, + nanny=nanny, + name=name, + death_timeout=death_timeout, + local_directory=local_directory, + interface=interface, + extra=extra, ) - command_args = [dask_worker_command, self.scheduler] - command_args += ["--nthreads", self.worker_process_threads] - if processes is not None and processes > 1: - command_args += ["--nprocs", processes] - - command_args += ["--memory-limit", self.worker_process_memory] - command_args += ["--name", str(name)] - command_args += ["--nanny" if nanny else "--no-nanny"] - - if death_timeout is not None: - command_args += ["--death-timeout", death_timeout] - if local_directory is not None: - command_args += ["--local-directory", local_directory] - if extra is not None: - command_args += extra - - self._command_template = " ".join(map(str, command_args)) self.log_directory = log_directory if self.log_directory is not None: @@ -275,6 +269,13 @@ def default_config_name(cls): ) return config_name + @property + def template_env(self): + """ + Jinja2 template rendering environment + """ + return Environment(loader=PackageLoader('dask_jobqueue', 'templates')) + def job_script(self): """Construct a job submission script""" header = "\n".join( @@ -290,7 +291,7 @@ def job_script(self): "env_header": self._env_header, "worker_command": self._command_template, } - return self._script_template % pieces + return self.template_env.get_template('script.sh').render(**pieces) @contextmanager def job_file(self): From 29ce63e87b92f9b0513509bc27744422caa724ef Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 00:42:06 -0600 Subject: [PATCH 03/22] render PBS job header with template --- dask_jobqueue/pbs.py | 58 +++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index b89d7c7e..f866b63e 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -34,6 +34,19 @@ def pbs_format_bytes_ceil(n): return "%dB" % n +def pbs_format_resource_spec(resource_spec, worker_cores, worker_memory): + if resource_spec is None: + # Compute default resources specifications + resource_spec = "select=1:ncpus=%d" % worker_cores + memory_string = pbs_format_bytes_ceil(worker_memory) + resource_spec += ":mem=" + memory_string + logger.info( + "Resource specification for PBS not set, initializing it to %s" + % resource_spec + ) + return resource_spec + + class PBSJob(Job): submit_command = "qsub" cancel_command = "qdel" @@ -73,37 +86,26 @@ def __init__( # Try to find a project name from environment variable project = project or os.environ.get("PBS_ACCOUNT") - header_lines = [] - # PBS header build - if self.job_name is not None: - header_lines.append("#PBS -N %s" % self.job_name) - if queue is not None: - header_lines.append("#PBS -q %s" % queue) - if project is not None: - header_lines.append("#PBS -A %s" % project) - if resource_spec is None: - # Compute default resources specifications - resource_spec = "select=1:ncpus=%d" % self.worker_cores - memory_string = pbs_format_bytes_ceil(self.worker_memory) - resource_spec += ":mem=" + memory_string - logger.info( - "Resource specification for PBS not set, initializing it to %s" - % resource_spec - ) - if resource_spec is not None: - header_lines.append("#PBS -l %s" % resource_spec) - if walltime is not None: - header_lines.append("#PBS -l walltime=%s" % walltime) - if self.log_directory is not None: - header_lines.append("#PBS -e %s/" % self.log_directory) - header_lines.append("#PBS -o %s/" % self.log_directory) - header_lines.extend(["#PBS %s" % arg for arg in job_extra]) - - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) + self.job_header = self.template_env.get_template('pbs_job_header').render( + job_name=self.job_name, + queue=queue, + project=project, + resource_spec=resource_spec, + worker_cores=self.worker_cores, + worker_memory=self.worker_memory, + walltime=walltime, + log_directory=self.log_directory, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) + @property + def template_env(self): + env = super().template_env + env.filters['pbs_format_resource_spec'] = pbs_format_resource_spec + return env + class PBSCluster(JobQueueCluster): __doc__ = """ Launch Dask on a PBS cluster From 8f94ba84872e3875ba793c6a85e9f6b5e7c52a28 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 02:20:55 -0600 Subject: [PATCH 04/22] apply black --- dask_jobqueue/core.py | 6 +++--- dask_jobqueue/pbs.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 8510cefe..91915ae0 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -238,7 +238,7 @@ def __init__( self._env_header = "\n".join(filter(None, env_extra)) self.header_skip = set(header_skip) - self._command_template = self.template_env.get_template('command').render( + self._command_template = self.template_env.get_template("command").render( python=python, scheduler=self.scheduler, threads=self.worker_process_threads, @@ -274,7 +274,7 @@ def template_env(self): """ Jinja2 template rendering environment """ - return Environment(loader=PackageLoader('dask_jobqueue', 'templates')) + return Environment(loader=PackageLoader("dask_jobqueue", "templates")) def job_script(self): """Construct a job submission script""" @@ -291,7 +291,7 @@ def job_script(self): "env_header": self._env_header, "worker_command": self._command_template, } - return self.template_env.get_template('script.sh').render(**pieces) + return self.template_env.get_template("script.sh").render(**pieces) @contextmanager def job_file(self): diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index f866b63e..41c20bfe 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -86,7 +86,7 @@ def __init__( # Try to find a project name from environment variable project = project or os.environ.get("PBS_ACCOUNT") - self.job_header = self.template_env.get_template('pbs_job_header').render( + self.job_header = self.template_env.get_template("pbs_job_header").render( job_name=self.job_name, queue=queue, project=project, @@ -103,7 +103,7 @@ def __init__( @property def template_env(self): env = super().template_env - env.filters['pbs_format_resource_spec'] = pbs_format_resource_spec + env.filters["pbs_format_resource_spec"] = pbs_format_resource_spec return env From 00702edb12122afb451d7650ae58b73efeb815a8 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 02:21:13 -0600 Subject: [PATCH 05/22] formatting --- dask_jobqueue/templates/pbs_job_header | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dask_jobqueue/templates/pbs_job_header b/dask_jobqueue/templates/pbs_job_header index 8373ea85..660e2dac 100644 --- a/dask_jobqueue/templates/pbs_job_header +++ b/dask_jobqueue/templates/pbs_job_header @@ -1,23 +1,23 @@ {% if job_name is not none -%} -#PBS -N {{job_name}} +#PBS -N {{ job_name }} {%- endif %} {% if queue is not none -%} -#PBS -q {{queue}} +#PBS -q {{ queue }} {%- endif %} {% if project is not none -%} -#PBS -A {{project}} +#PBS -A {{ project }} {%- endif %} -#PBS -l {{resource_spec | pbs_format_resource_spec(worker_cores, worker_memory) }} +#PBS -l {{ resource_spec | pbs_format_resource_spec(worker_cores, worker_memory) }} {% if walltime is not none -%} -#PBS -l walltime={{walltime}} +#PBS -l walltime={{ walltime }} {%- endif %} {% if log_directory is not none -%} -#PBS -o {{log_directory}}/ -#PBS -e {{log_directory}}/ +#PBS -o {{ log_directory }}/ +#PBS -e {{ log_directory }}/ {%- endif %} {%- if job_extra is not none -%} {%- for j in job_extra %} -#PBS {{j}} +#PBS {{ j }} {%- endfor -%} {%- endif %} JOB_ID=${PBS_JOBID%%.*} \ No newline at end of file From 6386c3e75bec6562d93e37217f6b11e3eae2f4d1 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 02:22:05 -0600 Subject: [PATCH 06/22] template for slurm job header --- dask_jobqueue/templates/slurm_job_header | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 dask_jobqueue/templates/slurm_job_header diff --git a/dask_jobqueue/templates/slurm_job_header b/dask_jobqueue/templates/slurm_job_header new file mode 100644 index 00000000..0ec74a53 --- /dev/null +++ b/dask_jobqueue/templates/slurm_job_header @@ -0,0 +1,26 @@ +{% if job_name is not none -%} +#SBATCH -J {{ job_name }} +{%- endif %} +{% if log_directory is not none -%} +#SBATCH -o {{ log_directory }}/{{ job_name if job_name is not none else "worker" }}-%J.err +#SBATCH -e {{ log_directory }}/{{ job_name if job_name is not none else "worker" }}-%J.out +{%- endif %} +{% if queue is not none -%} +#SBATCH -p {{ queue }} +{%- endif %} +{% if project is not none -%} +#SBATCH -A {{ project }} +{%- endif %} +#SBATCH -n 1 +#SBATCH --cpus-per-task={{ job_cpu if job_cpu is not none else worker_cores }} +#SBATCH --mem={{ memory | slurm_format_memory(worker_memory) }} +{% if walltime is not none -%} +#SBATCH -t {{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#SBATCH {{ j }} +{%- endfor -%} +{%- endif %} + +JOB_ID=${SLURM_JOB_ID%;*} \ No newline at end of file From 4cd56d24c6161fb3b2d33da9417699a1451a5fbd Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Sun, 24 Nov 2019 02:22:30 -0600 Subject: [PATCH 07/22] render slurm job header with jinja2 --- dask_jobqueue/slurm.py | 55 ++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index e2871b02..548aff41 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -44,43 +44,30 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - # SLURM header build - if self.job_name is not None: - header_lines.append("#SBATCH -J %s" % self.job_name) - if self.log_directory is not None: - header_lines.append( - "#SBATCH -e %s/%s-%%J.err" - % (self.log_directory, self.job_name or "worker") - ) - header_lines.append( - "#SBATCH -o %s/%s-%%J.out" - % (self.log_directory, self.job_name or "worker") - ) - if queue is not None: - header_lines.append("#SBATCH -p %s" % queue) - if project is not None: - header_lines.append("#SBATCH -A %s" % project) - - # Init resources, always 1 task, - # and then number of cpu is processes * threads if not set - header_lines.append("#SBATCH -n 1") - header_lines.append( - "#SBATCH --cpus-per-task=%d" % (job_cpu or self.worker_cores) + self.job_header = self.template_env.get_template("slurm_job_header").render( + job_name=self.job_name, + log_directory=self.log_directory, + queue=queue, + project=project, + job_cpu=job_cpu, + worker_cores=self.worker_cores, + memory=job_mem, + worker_memory=self.worker_memory, + walltime=walltime, + job_extra=job_extra, ) - # Memory - memory = job_mem - if job_mem is None: - memory = slurm_format_bytes_ceil(self.worker_memory) - if memory is not None: - header_lines.append("#SBATCH --mem=%s" % memory) - if walltime is not None: - header_lines.append("#SBATCH -t %s" % walltime) - header_lines.extend(["#SBATCH %s" % arg for arg in job_extra]) + @property + def template_env(self): + env = super().template_env + env.filters["slurm_format_memory"] = slurm_format_memory + return env + - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) +def slurm_format_memory(memory, worker_memory): + if memory is None: + memory = slurm_format_bytes_ceil(worker_memory) + return memory def slurm_format_bytes_ceil(n): From 9f4d995474a0a4a2ae1389e3eb18b55b433e1970 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Mon, 25 Nov 2019 02:03:54 -0600 Subject: [PATCH 08/22] use template to generate condor job script --- dask_jobqueue/htcondor.py | 136 ++++++--------------- dask_jobqueue/templates/htcondor_script.sh | 29 +++++ 2 files changed, 63 insertions(+), 102 deletions(-) create mode 100644 dask_jobqueue/templates/htcondor_script.sh diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index 981b2672..304f25e7 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -11,18 +11,6 @@ class HTCondorJob(Job): - _script_template = """ -%(shebang)s - -%(job_header)s - -Environment = "%(quoted_environment)s" -Arguments = "%(quoted_arguments)s" -Executable = %(executable)s - -Queue -""".lstrip() - submit_command = "condor_submit" cancel_command = "condor_rm" job_id_regexp = r"(?P\d+\.\d+)" @@ -65,63 +53,21 @@ def __init__( env_extra = dask.config.get( "jobqueue.%s.env-extra" % self.config_name, default=[] ) - self.env_dict = self.env_lines_to_dict(env_extra) - - self.job_header_dict = { - "MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"', - "RequestCpus": "MY.DaskWorkerCores", - "RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)", - "RequestDisk": "floor(MY.DaskWorkerDisk / 1024)", - "MY.JobId": '"$(ClusterId).$(ProcId)"', - "MY.DaskWorkerCores": self.worker_cores, - "MY.DaskWorkerMemory": self.worker_memory, - "MY.DaskWorkerDisk": self.worker_disk, - } - if self.log_directory: - self.job_header_dict.update( - { - "LogDirectory": self.log_directory, - # $F(...) strips quotes - "Output": "$(LogDirectory)/worker-$F(MY.JobId).out", - "Error": "$(LogDirectory)/worker-$F(MY.JobId).err", - "Log": "$(LogDirectory)/worker-$(ClusterId).log", - # We kill all the workers to stop them so we need to stream their - # output+error if we ever want to see anything - "Stream_Output": True, - "Stream_Error": True, - } - ) - if self.job_extra: - self.job_header_dict.update(self.job_extra) - - def env_lines_to_dict(self, env_lines): - """Convert an array of export statements (what we get from env-extra - in the config) into a dict""" - env_dict = {} - for env_line in env_lines: - split_env_line = shlex.split(env_line) - if split_env_line[0] == "export": - split_env_line = split_env_line[1:] - for item in split_env_line: - if "=" in item: - k, v = item.split("=", 1) - env_dict[k] = v - return env_dict + self.env_extra = env_extra def job_script(self): - """Construct a job submission script""" - quoted_arguments = quote_arguments(["-c", self._command_template]) - quoted_environment = quote_environment(self.env_dict) - job_header_lines = "\n".join( - "%s = %s" % (k, v) for k, v in self.job_header_dict.items() + """ Construct a job submission script """ + return self.template_env.get_template("htcondor_script.sh").render( + shebang=self.shebang, + worker_command=self._command_template, + executable=self.executable, + log_directory=self.log_directory, + worker_cores=self.worker_cores, + worker_disk=self.worker_disk, + worker_memory=self.worker_memory, + job_extra=self.job_extra, + env_extra=self.env_extra, ) - return self._script_template % { - "shebang": self.shebang, - "job_header": job_header_lines, - "quoted_environment": quoted_environment, - "quoted_arguments": quoted_arguments, - "executable": self.executable, - } def _job_id_from_submit_output(self, out): cluster_id_regexp = r"submitted to cluster (\d+)" @@ -135,47 +81,33 @@ def _job_id_from_submit_output(self, out): raise ValueError(msg) return "%s.0" % match.group(1) + @property + def template_env(self): + env = super().template_env + env.filters['env_lines_to_dict'] = env_lines_to_dict + env.filters['quote_environment'] = quote_environment + return env + + +def env_lines_to_dict(env_lines): + """ Convert an array of export statements (what we get from env-extra + in the config) into a dict """ + env_dict = {} + for env_line in env_lines: + split_env_line = shlex.split(env_line) + if split_env_line[0] == "export": + split_env_line = split_env_line[1:] + for item in split_env_line: + if "=" in item: + k, v = item.split("=", 1) + env_dict[k] = v + return env_dict + def _double_up_quotes(instr): return instr.replace("'", "''").replace('"', '""') -def quote_arguments(args): - """Quote a string or list of strings using the Condor submit file "new" argument quoting rules. - - Returns - ------- - str - The arguments in a quoted form. - - Warnings - -------- - You will need to surround the result in double-quotes before using it in - the Arguments attribute. - - Examples - -------- - >>> quote_arguments(["3", "simple", "arguments"]) - '3 simple arguments' - >>> quote_arguments(["one", "two with spaces", "three"]) - 'one \'two with spaces\' three' - >>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"]) - 'one ""two"" \'spacey \'\'quoted\'\' argument\'' - """ - if isinstance(args, str): - args_list = [args] - else: - args_list = args - - quoted_args = [] - for a in args_list: - qa = _double_up_quotes(a) - if " " in qa or "'" in qa: - qa = "'" + qa + "'" - quoted_args.append(qa) - return " ".join(quoted_args) - - def quote_environment(env): """Quote a dict of strings using the Condor submit file "new" environment quoting rules. diff --git a/dask_jobqueue/templates/htcondor_script.sh b/dask_jobqueue/templates/htcondor_script.sh new file mode 100644 index 00000000..a294d757 --- /dev/null +++ b/dask_jobqueue/templates/htcondor_script.sh @@ -0,0 +1,29 @@ +{{ shebang }} + +MY.DaskWorkerName = "htcondor--$F(MY.JobId)--" +RequestCpus = MY.DaskWorkerCores +RequestMemory = floor(MY.DaskWorkerMemory / 1048576) +RequestDisk = floor(MY.DaskWorkerDisk / 1024) +MY.JobId = "$(ClusterId).$(ProcId)" +MY.DaskWorkerCores = {{ worker_cores }} +MY.DaskWorkerMemory = {{ worker_memory }} +MY.DaskWorkerDisk = {{ worker_disk }} +{%- if job_extra is not none -%} +{%- for k,v in job_extra.items() %} +{{ k }} = {{ v }} +{%- endfor -%} +{%- endif %} +{% if log_directory is not none -%} +LogDirectory = {{ log_directory }} +Output = $(LogDirectory)/worker-$F(MY.JobId).out +Error = $(LogDirectory)/worker-$F(MY.JobId).err +Log = $(LogDirectory)/worker-$(ClusterId).log +Stream_Output = True +Stream_Error = True +{%- endif %} + +Environment = "{{ env_extra | env_lines_to_dict | quote_environment }} JOB_ID=$F(MY.JobId)" +Arguments = "-c '{{ worker_command }}'" +Executable = {{ executable }} + +Queue \ No newline at end of file From 9e571eed40b38196a5fe68a64c33c2bd8110540a Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Mon, 25 Nov 2019 02:04:18 -0600 Subject: [PATCH 09/22] remove header test --- dask_jobqueue/tests/test_htcondor.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 6e163fc9..3b551d7d 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -11,13 +11,6 @@ QUEUE_WAIT = 30 # seconds -def test_header(): - with HTCondorCluster(cores=1, memory="100MB", disk="100MB") as cluster: - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerCores"] == 1 - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerDisk"] == 100000000 - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerMemory"] == 100000000 - - def test_job_script(): with HTCondorCluster( cores=4, From 7fb6f75ad7295ff7e1cb9454999a0617697ea2d8 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Mon, 25 Nov 2019 02:06:20 -0600 Subject: [PATCH 10/22] apply black --- dask_jobqueue/htcondor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index 304f25e7..c7c0f702 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -84,8 +84,8 @@ def _job_id_from_submit_output(self, out): @property def template_env(self): env = super().template_env - env.filters['env_lines_to_dict'] = env_lines_to_dict - env.filters['quote_environment'] = quote_environment + env.filters["env_lines_to_dict"] = env_lines_to_dict + env.filters["quote_environment"] = quote_environment return env From 918d17339471b4a2fd9b4041b1b52bc9db3b1aa7 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Mon, 25 Nov 2019 02:09:42 -0600 Subject: [PATCH 11/22] remove trailing space --- dask_jobqueue/templates/command | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_jobqueue/templates/command b/dask_jobqueue/templates/command index 8edd479d..a331dbb5 100644 --- a/dask_jobqueue/templates/command +++ b/dask_jobqueue/templates/command @@ -18,5 +18,5 @@ {{ extra | join(" ") }}{{" "}} {%- endif -%} {%- if interface -%} ---interface {{ interface }}{{" "}} +--interface {{ interface }} {%- endif -%} From 1edb3cd2fcdaafabc5376c3c020109daee0104df Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 01:59:30 -0600 Subject: [PATCH 12/22] remove old script template --- dask_jobqueue/core.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 91915ae0..ad1790aa 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -115,14 +115,6 @@ class Job(ProcessInterface, abc.ABC): job_parameters=job_parameters ) - _script_template = """ -%(shebang)s - -%(job_header)s -%(env_header)s -%(worker_command)s -""".lstrip() - # Following class attributes should be overridden by extending classes. submit_command = None cancel_command = None From caad97547776596e862e2825d5de60a07322b981 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 01:59:47 -0600 Subject: [PATCH 13/22] render LSF job header with Jinja --- dask_jobqueue/lsf.py | 85 +++++++++++++------------- dask_jobqueue/templates/lsf_job_header | 33 ++++++++++ 2 files changed, 75 insertions(+), 43 deletions(-) create mode 100644 dask_jobqueue/templates/lsf_job_header diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 3675976c..0b680f22 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -57,52 +57,32 @@ def __init__( use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name) self.use_stdin = use_stdin - header_lines = [] - # LSF header build - if self.name is not None: - header_lines.append("#BSUB -J %s" % self.job_name) - if self.log_directory is not None: - header_lines.append( - "#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") - ) - header_lines.append( - "#BSUB -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker") - ) - if queue is not None: - header_lines.append("#BSUB -q %s" % queue) - if project is not None: - header_lines.append('#BSUB -P "%s"' % project) - if ncpus is None: - # Compute default cores specifications - ncpus = self.worker_cores - logger.info( - "ncpus specification for LSF not set, initializing it to %s" % ncpus - ) - if ncpus is not None: - header_lines.append("#BSUB -n %s" % ncpus) - if ncpus > 1: - # span[hosts=1] _might_ affect queue waiting - # time, and is not required if ncpus==1 - header_lines.append('#BSUB -R "span[hosts=1]"') - if mem is None: - # Compute default memory specifications - mem = self.worker_memory - logger.info( - "mem specification for LSF not set, initializing it to %s bytes" % mem - ) - if mem is not None: - lsf_units = lsf_units if lsf_units is not None else lsf_detect_units() - memory_string = lsf_format_bytes_ceil(mem, lsf_units=lsf_units) - header_lines.append("#BSUB -M %s" % memory_string) - if walltime is not None: - header_lines.append("#BSUB -W %s" % walltime) - header_lines.extend(["#BSUB %s" % arg for arg in job_extra]) - - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) + self.job_header = self.template_env.get_template("lsf_job_header").render( + name=self.name, + job_name=self.job_name, + log_directory=self.log_directory, + queue=queue, + project=project, + ncpus=ncpus, + worker_cores=self.worker_cores, + mem=mem, + worker_memory=self.worker_memory, + lsf_units=lsf_units, + walltime=walltime, + job_extra=job_extra, + logger=logger, + ) logger.debug("Job script: \n %s" % self.job_script()) + @property + def template_env(self): + env = super().template_env + env.filters['set_ncpus'] = set_ncpus + env.filters['set_mem'] = set_mem + env.filters['format_memory'] = format_memory + return env + async def _submit_job(self, script_filename): if self.use_stdin: piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] @@ -112,6 +92,25 @@ async def _submit_job(self, script_filename): return result +def set_ncpus(ncpus, worker_cores, logger): + if ncpus is None: + ncpus = worker_cores + logger.info("ncpus specification for LSF not set, initializing it to %s" % ncpus) + return ncpus + + +def set_mem(mem, worker_memory, logger): + if mem is None: + mem = worker_memory + logger.info("mem specification for LSF not set, initializing it to %s bytes" % mem) + return mem + + +def format_memory(mem, lsf_units): + lsf_units = lsf_units if lsf_units is not None else lsf_detect_units() + return lsf_format_bytes_ceil(mem, lsf_units=lsf_units) + + def lsf_format_bytes_ceil(n, lsf_units="mb"): """Format bytes as text diff --git a/dask_jobqueue/templates/lsf_job_header b/dask_jobqueue/templates/lsf_job_header new file mode 100644 index 00000000..1313a5d6 --- /dev/null +++ b/dask_jobqueue/templates/lsf_job_header @@ -0,0 +1,33 @@ +{% if name is not none -%} +#BSUB -J {{ job_name }} +{%- endif %} +{% if log_directory is not none -%} +#BSUB -e {{ log_directory }}/{{ name if name is not none else 'worker' }}-%J.err +#BSUB -o {{ log_directory }}/{{ name if name is not none else 'worker' }}-%J.out +{%- endif %} +{% if queue is not none -%} +#BSUB -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#BSUB -P {{ project }} +{%- endif %} +{%- set ncpus = ncpus | set_ncpus(worker_cores, logger) -%} +{% if ncpus is not none -%} +#BSUB -n {{ ncpus }} +{% if ncpus > 1 -%} +#BSUB -R "span[hosts=1]" +{%- endif %} +{%- endif %} +{%- set mem = mem | set_mem(worker_memory, logger) -%} +{% if mem is not none -%} +#BSUB -M {{ mem | format_memory(lsf_units) }} +{%- endif %} +{% if walltime is not none -%} +#BSUB -W {{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#BSUB {{ j }} +{%- endfor -%} +{%- endif %} +JOB_ID=${LSB_JOBID%.*} \ No newline at end of file From 272565aa2c8d4fb9c5a057d43d9df3acb9ee2087 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:00:01 -0600 Subject: [PATCH 14/22] render OAR job header with Jinja --- dask_jobqueue/oar.py | 61 +++++++++++++++----------- dask_jobqueue/templates/oar_job_header | 22 ++++++++++ 2 files changed, 58 insertions(+), 25 deletions(-) create mode 100644 dask_jobqueue/templates/oar_job_header diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 13e8cceb..06d17981 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -45,31 +45,42 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - if self.job_name is not None: - header_lines.append("#OAR -n %s" % self.job_name) - if queue is not None: - header_lines.append("#OAR -q %s" % queue) - if project is not None: - header_lines.append("#OAR --project %s" % project) - - # OAR needs to have the resource on a single line otherwise it is - # considered as a "moldable job" (i.e. the scheduler can chose between - # multiple sets of resources constraints) - resource_spec_list = [] - if resource_spec is None: - # default resource_spec if not specified. Crucial to specify - # nodes=1 to make sure the cores allocated are on the same node. - resource_spec = "/nodes=1/core=%d" % self.worker_cores - resource_spec_list.append(resource_spec) - if walltime is not None: - resource_spec_list.append("walltime=%s" % walltime) - - full_resource_spec = ",".join(resource_spec_list) - header_lines.append("#OAR -l %s" % full_resource_spec) - header_lines.extend(["#OAR %s" % arg for arg in job_extra]) - - self.job_header = "\n".join(header_lines) + #header_lines = [] + #if self.job_name is not None: + # header_lines.append("#OAR -n %s" % self.job_name) + #if queue is not None: + # header_lines.append("#OAR -q %s" % queue) + #if project is not None: + # header_lines.append("#OAR --project %s" % project) +# + ## OAR needs to have the resource on a single line otherwise it is + ## considered as a "moldable job" (i.e. the scheduler can chose between + ## multiple sets of resources constraints) + #resource_spec_list = [] + #if resource_spec is None: + # # default resource_spec if not specified. Crucial to specify + # # nodes=1 to make sure the cores allocated are on the same node. + # resource_spec = "/nodes=1/core=%d" % self.worker_cores + #resource_spec_list.append(resource_spec) + #if walltime is not None: + # resource_spec_list.append("walltime=%s" % walltime) +# + #full_resource_spec = ",".join(resource_spec_list) + #header_lines.append("#OAR -l %s" % full_resource_spec) + #header_lines.extend(["#OAR %s" % arg for arg in job_extra]) + #header_lines.append("JOB_ID=${OAR_JOB_ID}") +# + ##self.job_header = "\n".join(header_lines) + + self.job_header = self.template_env.get_template("oar_job_header").render( + job_name=self.job_name, + queue=queue, + project=project, + resource_spec=resource_spec, + walltime=walltime, + worker_cores=self.worker_cores, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/templates/oar_job_header b/dask_jobqueue/templates/oar_job_header new file mode 100644 index 00000000..aa30d126 --- /dev/null +++ b/dask_jobqueue/templates/oar_job_header @@ -0,0 +1,22 @@ +{% if job_name is not none -%} +#OAR -n {{ job_name }} +{%- endif %} +{% if queue is not none -%} +#OAR -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#OAR --project {{ project }} +{%- endif %} +#OAR -l{{ " " }} +{%- if resource_spec is none %}/nodes=1/core={{ worker_cores }} +{%- else %}{{ resource_spec }} +{%- endif %} +{%- if walltime is not none -%} +,walltime={{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#OAR {{ j }} +{%- endfor -%} +{%- endif %} +JOB_ID=${OAR_JOB_ID} \ No newline at end of file From 88f1bad0fb5116456c2b2813b1f458c4e65247d8 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:00:13 -0600 Subject: [PATCH 15/22] render SGE job header with Jinja --- dask_jobqueue/sge.py | 37 +++++++------------------- dask_jobqueue/templates/sge_job_header | 26 ++++++++++++++++++ 2 files changed, 35 insertions(+), 28 deletions(-) create mode 100644 dask_jobqueue/templates/sge_job_header diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 10fa24a6..7c3ad4be 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -41,34 +41,15 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - if self.job_name is not None: - header_lines.append("#$ -N %(job-name)s") - if queue is not None: - header_lines.append("#$ -q %(queue)s") - if project is not None: - header_lines.append("#$ -P %(project)s") - if resource_spec is not None: - header_lines.append("#$ -l %(resource_spec)s") - if walltime is not None: - header_lines.append("#$ -l h_rt=%(walltime)s") - if self.log_directory is not None: - header_lines.append("#$ -e %(log_directory)s/") - header_lines.append("#$ -o %(log_directory)s/") - header_lines.extend(["#$ -cwd", "#$ -j y"]) - header_lines.extend(["#$ %s" % arg for arg in job_extra]) - header_template = "\n".join(header_lines) - - config = { - "job-name": self.job_name, - "queue": queue, - "project": project, - "processes": self.worker_processes, - "walltime": walltime, - "resource_spec": resource_spec, - "log_directory": self.log_directory, - } - self.job_header = header_template % config + self.job_header = self.template_env.get_template("sge_job_header").render( + job_name=self.job_name, + queue=queue, + project=project, + walltime=walltime, + resource_spec=resource_spec, + log_directory=self.log_directory, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/templates/sge_job_header b/dask_jobqueue/templates/sge_job_header new file mode 100644 index 00000000..0a7c4728 --- /dev/null +++ b/dask_jobqueue/templates/sge_job_header @@ -0,0 +1,26 @@ +{% if job_name is not none -%} +#$ -N {{ job_name }} +{%- endif %} +{% if queue is not none -%} +#$ -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#$ -P {{ project }} +{%- endif %} +{% if resource_spec is not none -%} +#$ -l {{ resource_spec }} +{%- endif %} +{% if walltime is not none -%} +#$ -l h_rt={{ walltime }} +{%- endif %} +{% if log_directory is not none -%} +#$ -e {{ log_directory }}/ +#$ -o {{ log_directory }}/ +{%- endif %} +#$ -cwd +#$ -j y +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#$ {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file From 5939574f1617667de7f5f0a46b0a830c5a6f283a Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:15:33 -0600 Subject: [PATCH 16/22] apply black --- dask_jobqueue/lsf.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 0b680f22..d0224482 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -78,9 +78,9 @@ def __init__( @property def template_env(self): env = super().template_env - env.filters['set_ncpus'] = set_ncpus - env.filters['set_mem'] = set_mem - env.filters['format_memory'] = format_memory + env.filters["set_ncpus"] = set_ncpus + env.filters["set_mem"] = set_mem + env.filters["format_memory"] = format_memory return env async def _submit_job(self, script_filename): @@ -95,14 +95,18 @@ async def _submit_job(self, script_filename): def set_ncpus(ncpus, worker_cores, logger): if ncpus is None: ncpus = worker_cores - logger.info("ncpus specification for LSF not set, initializing it to %s" % ncpus) + logger.info( + "ncpus specification for LSF not set, initializing it to %s" % ncpus + ) return ncpus def set_mem(mem, worker_memory, logger): if mem is None: mem = worker_memory - logger.info("mem specification for LSF not set, initializing it to %s bytes" % mem) + logger.info( + "mem specification for LSF not set, initializing it to %s bytes" % mem + ) return mem From 97d64ceaa3a45da92276fa679990d82f2e463d0a Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:16:00 -0600 Subject: [PATCH 17/22] remove old templating code --- dask_jobqueue/oar.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 06d17981..c56c62d8 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -45,33 +45,6 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - #header_lines = [] - #if self.job_name is not None: - # header_lines.append("#OAR -n %s" % self.job_name) - #if queue is not None: - # header_lines.append("#OAR -q %s" % queue) - #if project is not None: - # header_lines.append("#OAR --project %s" % project) -# - ## OAR needs to have the resource on a single line otherwise it is - ## considered as a "moldable job" (i.e. the scheduler can chose between - ## multiple sets of resources constraints) - #resource_spec_list = [] - #if resource_spec is None: - # # default resource_spec if not specified. Crucial to specify - # # nodes=1 to make sure the cores allocated are on the same node. - # resource_spec = "/nodes=1/core=%d" % self.worker_cores - #resource_spec_list.append(resource_spec) - #if walltime is not None: - # resource_spec_list.append("walltime=%s" % walltime) -# - #full_resource_spec = ",".join(resource_spec_list) - #header_lines.append("#OAR -l %s" % full_resource_spec) - #header_lines.extend(["#OAR %s" % arg for arg in job_extra]) - #header_lines.append("JOB_ID=${OAR_JOB_ID}") -# - ##self.job_header = "\n".join(header_lines) - self.job_header = self.template_env.get_template("oar_job_header").render( job_name=self.job_name, queue=queue, From 29aa9bb9a7be8f5de8bd346b11e606b7d5d8d488 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 Nov 2019 02:29:53 -0600 Subject: [PATCH 18/22] fix formatting --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index dfcf3188..e301f6c8 100755 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ python_requires=">=3.6", license="BSD 3-Clause", packages=["dask_jobqueue"], - package_data={'dask_jobqueue': ['templates/*']}, + package_data={"dask_jobqueue": ["templates/*"]}, include_package_data=True, install_requires=install_requires, tests_require=["pytest >= 2.7.1"], From ca8b697b0ae44f1dc89735156f466cdb242f7411 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Wed, 27 May 2020 14:59:51 -0400 Subject: [PATCH 19/22] add jinja2 to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index e1a85007..ab270ad7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ dask>=2.19 distributed>=2.19 +Jinja2 From 2fe7e305da6d179ccdc7fe61ff69b9fae8c3ea6e Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Mon, 5 Jul 2021 17:00:18 -0400 Subject: [PATCH 20/22] formatting --- dask_jobqueue/htcondor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index c7c0f702..72e54a07 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -56,7 +56,7 @@ def __init__( self.env_extra = env_extra def job_script(self): - """ Construct a job submission script """ + """Construct a job submission script""" return self.template_env.get_template("htcondor_script.sh").render( shebang=self.shebang, worker_command=self._command_template, @@ -90,8 +90,8 @@ def template_env(self): def env_lines_to_dict(env_lines): - """ Convert an array of export statements (what we get from env-extra - in the config) into a dict """ + """Convert an array of export statements (what we get from env-extra + in the config) into a dict""" env_dict = {} for env_line in env_lines: split_env_line = shlex.split(env_line) From 32e1d3faca662cc82bec5fb901bd9a9d086062df Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 6 Jul 2021 10:21:14 +0100 Subject: [PATCH 21/22] Add quotes to LSF template and rename templates with Jinja2 file extension --- dask_jobqueue/core.py | 2 +- dask_jobqueue/lsf.py | 2 +- dask_jobqueue/oar.py | 2 +- dask_jobqueue/pbs.py | 2 +- dask_jobqueue/sge.py | 2 +- dask_jobqueue/slurm.py | 2 +- dask_jobqueue/templates/{command => command.j2} | 0 .../templates/{lsf_job_header => lsf_job_header.j2} | 6 +++--- .../templates/{oar_job_header => oar_job_header.j2} | 0 .../templates/{pbs_job_header => pbs_job_header.j2} | 0 .../templates/{sge_job_header => sge_job_header.j2} | 0 .../templates/{slurm_job_header => slurm_job_header.j2} | 0 12 files changed, 9 insertions(+), 9 deletions(-) rename dask_jobqueue/templates/{command => command.j2} (100%) rename dask_jobqueue/templates/{lsf_job_header => lsf_job_header.j2} (84%) rename dask_jobqueue/templates/{oar_job_header => oar_job_header.j2} (100%) rename dask_jobqueue/templates/{pbs_job_header => pbs_job_header.j2} (100%) rename dask_jobqueue/templates/{sge_job_header => sge_job_header.j2} (100%) rename dask_jobqueue/templates/{slurm_job_header => slurm_job_header.j2} (100%) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index ad1790aa..38439955 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -230,7 +230,7 @@ def __init__( self._env_header = "\n".join(filter(None, env_extra)) self.header_skip = set(header_skip) - self._command_template = self.template_env.get_template("command").render( + self._command_template = self.template_env.get_template("command.j2").render( python=python, scheduler=self.scheduler, threads=self.worker_process_threads, diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index d0224482..0db0a79c 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -57,7 +57,7 @@ def __init__( use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name) self.use_stdin = use_stdin - self.job_header = self.template_env.get_template("lsf_job_header").render( + self.job_header = self.template_env.get_template("lsf_job_header.j2").render( name=self.name, job_name=self.job_name, log_directory=self.log_directory, diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index c56c62d8..7f24159d 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -45,7 +45,7 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - self.job_header = self.template_env.get_template("oar_job_header").render( + self.job_header = self.template_env.get_template("oar_job_header.j2").render( job_name=self.job_name, queue=queue, project=project, diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 41c20bfe..c9102032 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -86,7 +86,7 @@ def __init__( # Try to find a project name from environment variable project = project or os.environ.get("PBS_ACCOUNT") - self.job_header = self.template_env.get_template("pbs_job_header").render( + self.job_header = self.template_env.get_template("pbs_job_header.j2").render( job_name=self.job_name, queue=queue, project=project, diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 7c3ad4be..39f4861c 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -41,7 +41,7 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - self.job_header = self.template_env.get_template("sge_job_header").render( + self.job_header = self.template_env.get_template("sge_job_header.j2").render( job_name=self.job_name, queue=queue, project=project, diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 548aff41..a5f80b82 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -44,7 +44,7 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - self.job_header = self.template_env.get_template("slurm_job_header").render( + self.job_header = self.template_env.get_template("slurm_job_header.j2").render( job_name=self.job_name, log_directory=self.log_directory, queue=queue, diff --git a/dask_jobqueue/templates/command b/dask_jobqueue/templates/command.j2 similarity index 100% rename from dask_jobqueue/templates/command rename to dask_jobqueue/templates/command.j2 diff --git a/dask_jobqueue/templates/lsf_job_header b/dask_jobqueue/templates/lsf_job_header.j2 similarity index 84% rename from dask_jobqueue/templates/lsf_job_header rename to dask_jobqueue/templates/lsf_job_header.j2 index 1313a5d6..a4ff2739 100644 --- a/dask_jobqueue/templates/lsf_job_header +++ b/dask_jobqueue/templates/lsf_job_header.j2 @@ -9,16 +9,16 @@ #BSUB -q {{ queue }} {%- endif %} {% if project is not none -%} -#BSUB -P {{ project }} +#BSUB -P "{{ project }}" {%- endif %} -{%- set ncpus = ncpus | set_ncpus(worker_cores, logger) -%} +{% set ncpus = ncpus | set_ncpus(worker_cores, logger) -%} {% if ncpus is not none -%} #BSUB -n {{ ncpus }} {% if ncpus > 1 -%} #BSUB -R "span[hosts=1]" {%- endif %} {%- endif %} -{%- set mem = mem | set_mem(worker_memory, logger) -%} +{% set mem = mem | set_mem(worker_memory, logger) -%} {% if mem is not none -%} #BSUB -M {{ mem | format_memory(lsf_units) }} {%- endif %} diff --git a/dask_jobqueue/templates/oar_job_header b/dask_jobqueue/templates/oar_job_header.j2 similarity index 100% rename from dask_jobqueue/templates/oar_job_header rename to dask_jobqueue/templates/oar_job_header.j2 diff --git a/dask_jobqueue/templates/pbs_job_header b/dask_jobqueue/templates/pbs_job_header.j2 similarity index 100% rename from dask_jobqueue/templates/pbs_job_header rename to dask_jobqueue/templates/pbs_job_header.j2 diff --git a/dask_jobqueue/templates/sge_job_header b/dask_jobqueue/templates/sge_job_header.j2 similarity index 100% rename from dask_jobqueue/templates/sge_job_header rename to dask_jobqueue/templates/sge_job_header.j2 diff --git a/dask_jobqueue/templates/slurm_job_header b/dask_jobqueue/templates/slurm_job_header.j2 similarity index 100% rename from dask_jobqueue/templates/slurm_job_header rename to dask_jobqueue/templates/slurm_job_header.j2 From def8d489f8dd5c49308892193aecc1e3045d59e7 Mon Sep 17 00:00:00 2001 From: Will Barnes Date: Thu, 8 Jul 2021 11:44:06 -0400 Subject: [PATCH 22/22] remove JOB_ID from templates --- dask_jobqueue/templates/htcondor_script.sh | 2 +- dask_jobqueue/templates/lsf_job_header.j2 | 3 +-- dask_jobqueue/templates/oar_job_header.j2 | 3 +-- dask_jobqueue/templates/pbs_job_header.j2 | 3 +-- dask_jobqueue/templates/slurm_job_header.j2 | 4 +--- 5 files changed, 5 insertions(+), 10 deletions(-) diff --git a/dask_jobqueue/templates/htcondor_script.sh b/dask_jobqueue/templates/htcondor_script.sh index a294d757..5845adb3 100644 --- a/dask_jobqueue/templates/htcondor_script.sh +++ b/dask_jobqueue/templates/htcondor_script.sh @@ -22,7 +22,7 @@ Stream_Output = True Stream_Error = True {%- endif %} -Environment = "{{ env_extra | env_lines_to_dict | quote_environment }} JOB_ID=$F(MY.JobId)" +Environment = "{{ env_extra | env_lines_to_dict | quote_environment }}" Arguments = "-c '{{ worker_command }}'" Executable = {{ executable }} diff --git a/dask_jobqueue/templates/lsf_job_header.j2 b/dask_jobqueue/templates/lsf_job_header.j2 index a4ff2739..9af17eca 100644 --- a/dask_jobqueue/templates/lsf_job_header.j2 +++ b/dask_jobqueue/templates/lsf_job_header.j2 @@ -29,5 +29,4 @@ {%- for j in job_extra %} #BSUB {{ j }} {%- endfor -%} -{%- endif %} -JOB_ID=${LSB_JOBID%.*} \ No newline at end of file +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/oar_job_header.j2 b/dask_jobqueue/templates/oar_job_header.j2 index aa30d126..0e926f5d 100644 --- a/dask_jobqueue/templates/oar_job_header.j2 +++ b/dask_jobqueue/templates/oar_job_header.j2 @@ -18,5 +18,4 @@ {%- for j in job_extra %} #OAR {{ j }} {%- endfor -%} -{%- endif %} -JOB_ID=${OAR_JOB_ID} \ No newline at end of file +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/pbs_job_header.j2 b/dask_jobqueue/templates/pbs_job_header.j2 index 660e2dac..f1f71bb5 100644 --- a/dask_jobqueue/templates/pbs_job_header.j2 +++ b/dask_jobqueue/templates/pbs_job_header.j2 @@ -19,5 +19,4 @@ {%- for j in job_extra %} #PBS {{ j }} {%- endfor -%} -{%- endif %} -JOB_ID=${PBS_JOBID%%.*} \ No newline at end of file +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/slurm_job_header.j2 b/dask_jobqueue/templates/slurm_job_header.j2 index 0ec74a53..920f0d56 100644 --- a/dask_jobqueue/templates/slurm_job_header.j2 +++ b/dask_jobqueue/templates/slurm_job_header.j2 @@ -21,6 +21,4 @@ {%- for j in job_extra %} #SBATCH {{ j }} {%- endfor -%} -{%- endif %} - -JOB_ID=${SLURM_JOB_ID%;*} \ No newline at end of file +{%- endif %} \ No newline at end of file