diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index f546a1cc..38439955 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -9,6 +9,8 @@ import weakref import abc +from jinja2 import Environment, PackageLoader + import dask from distributed.core import Status @@ -113,14 +115,6 @@ class Job(ProcessInterface, abc.ABC): job_parameters=job_parameters ) - _script_template = """ -%(shebang)s - -%(job_header)s -%(env_header)s -%(worker_command)s -""".lstrip() - # Following class attributes should be overridden by extending classes. submit_command = None cancel_command = None @@ -236,27 +230,19 @@ def __init__( self._env_header = "\n".join(filter(None, env_extra)) self.header_skip = set(header_skip) - # dask-worker command line build - dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict( - python=python + self._command_template = self.template_env.get_template("command.j2").render( + python=python, + scheduler=self.scheduler, + threads=self.worker_process_threads, + processes=processes, + memory=self.worker_process_memory, + nanny=nanny, + name=name, + death_timeout=death_timeout, + local_directory=local_directory, + interface=interface, + extra=extra, ) - command_args = [dask_worker_command, self.scheduler] - command_args += ["--nthreads", self.worker_process_threads] - if processes is not None and processes > 1: - command_args += ["--nprocs", processes] - - command_args += ["--memory-limit", self.worker_process_memory] - command_args += ["--name", str(name)] - command_args += ["--nanny" if nanny else "--no-nanny"] - - if death_timeout is not None: - command_args += ["--death-timeout", death_timeout] - if local_directory is not None: - command_args += ["--local-directory", local_directory] - if extra is not None: - command_args += extra - - self._command_template = " ".join(map(str, command_args)) self.log_directory = log_directory if self.log_directory is not None: @@ -275,6 +261,13 @@ def default_config_name(cls): ) return config_name + @property + def template_env(self): + """ + Jinja2 template rendering environment + """ + return Environment(loader=PackageLoader("dask_jobqueue", "templates")) + def job_script(self): """Construct a job submission script""" header = "\n".join( @@ -290,7 +283,7 @@ def job_script(self): "env_header": self._env_header, "worker_command": self._command_template, } - return self._script_template % pieces + return self.template_env.get_template("script.sh").render(**pieces) @contextmanager def job_file(self): diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index 981b2672..72e54a07 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -11,18 +11,6 @@ class HTCondorJob(Job): - _script_template = """ -%(shebang)s - -%(job_header)s - -Environment = "%(quoted_environment)s" -Arguments = "%(quoted_arguments)s" -Executable = %(executable)s - -Queue -""".lstrip() - submit_command = "condor_submit" cancel_command = "condor_rm" job_id_regexp = r"(?P\d+\.\d+)" @@ -65,63 +53,21 @@ def __init__( env_extra = dask.config.get( "jobqueue.%s.env-extra" % self.config_name, default=[] ) - self.env_dict = self.env_lines_to_dict(env_extra) - - self.job_header_dict = { - "MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"', - "RequestCpus": "MY.DaskWorkerCores", - "RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)", - "RequestDisk": "floor(MY.DaskWorkerDisk / 1024)", - "MY.JobId": '"$(ClusterId).$(ProcId)"', - "MY.DaskWorkerCores": self.worker_cores, - "MY.DaskWorkerMemory": self.worker_memory, - "MY.DaskWorkerDisk": self.worker_disk, - } - if self.log_directory: - self.job_header_dict.update( - { - "LogDirectory": self.log_directory, - # $F(...) strips quotes - "Output": "$(LogDirectory)/worker-$F(MY.JobId).out", - "Error": "$(LogDirectory)/worker-$F(MY.JobId).err", - "Log": "$(LogDirectory)/worker-$(ClusterId).log", - # We kill all the workers to stop them so we need to stream their - # output+error if we ever want to see anything - "Stream_Output": True, - "Stream_Error": True, - } - ) - if self.job_extra: - self.job_header_dict.update(self.job_extra) - - def env_lines_to_dict(self, env_lines): - """Convert an array of export statements (what we get from env-extra - in the config) into a dict""" - env_dict = {} - for env_line in env_lines: - split_env_line = shlex.split(env_line) - if split_env_line[0] == "export": - split_env_line = split_env_line[1:] - for item in split_env_line: - if "=" in item: - k, v = item.split("=", 1) - env_dict[k] = v - return env_dict + self.env_extra = env_extra def job_script(self): """Construct a job submission script""" - quoted_arguments = quote_arguments(["-c", self._command_template]) - quoted_environment = quote_environment(self.env_dict) - job_header_lines = "\n".join( - "%s = %s" % (k, v) for k, v in self.job_header_dict.items() + return self.template_env.get_template("htcondor_script.sh").render( + shebang=self.shebang, + worker_command=self._command_template, + executable=self.executable, + log_directory=self.log_directory, + worker_cores=self.worker_cores, + worker_disk=self.worker_disk, + worker_memory=self.worker_memory, + job_extra=self.job_extra, + env_extra=self.env_extra, ) - return self._script_template % { - "shebang": self.shebang, - "job_header": job_header_lines, - "quoted_environment": quoted_environment, - "quoted_arguments": quoted_arguments, - "executable": self.executable, - } def _job_id_from_submit_output(self, out): cluster_id_regexp = r"submitted to cluster (\d+)" @@ -135,47 +81,33 @@ def _job_id_from_submit_output(self, out): raise ValueError(msg) return "%s.0" % match.group(1) + @property + def template_env(self): + env = super().template_env + env.filters["env_lines_to_dict"] = env_lines_to_dict + env.filters["quote_environment"] = quote_environment + return env + + +def env_lines_to_dict(env_lines): + """Convert an array of export statements (what we get from env-extra + in the config) into a dict""" + env_dict = {} + for env_line in env_lines: + split_env_line = shlex.split(env_line) + if split_env_line[0] == "export": + split_env_line = split_env_line[1:] + for item in split_env_line: + if "=" in item: + k, v = item.split("=", 1) + env_dict[k] = v + return env_dict + def _double_up_quotes(instr): return instr.replace("'", "''").replace('"', '""') -def quote_arguments(args): - """Quote a string or list of strings using the Condor submit file "new" argument quoting rules. - - Returns - ------- - str - The arguments in a quoted form. - - Warnings - -------- - You will need to surround the result in double-quotes before using it in - the Arguments attribute. - - Examples - -------- - >>> quote_arguments(["3", "simple", "arguments"]) - '3 simple arguments' - >>> quote_arguments(["one", "two with spaces", "three"]) - 'one \'two with spaces\' three' - >>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"]) - 'one ""two"" \'spacey \'\'quoted\'\' argument\'' - """ - if isinstance(args, str): - args_list = [args] - else: - args_list = args - - quoted_args = [] - for a in args_list: - qa = _double_up_quotes(a) - if " " in qa or "'" in qa: - qa = "'" + qa + "'" - quoted_args.append(qa) - return " ".join(quoted_args) - - def quote_environment(env): """Quote a dict of strings using the Condor submit file "new" environment quoting rules. diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 3675976c..0db0a79c 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -57,52 +57,32 @@ def __init__( use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name) self.use_stdin = use_stdin - header_lines = [] - # LSF header build - if self.name is not None: - header_lines.append("#BSUB -J %s" % self.job_name) - if self.log_directory is not None: - header_lines.append( - "#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") - ) - header_lines.append( - "#BSUB -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker") - ) - if queue is not None: - header_lines.append("#BSUB -q %s" % queue) - if project is not None: - header_lines.append('#BSUB -P "%s"' % project) - if ncpus is None: - # Compute default cores specifications - ncpus = self.worker_cores - logger.info( - "ncpus specification for LSF not set, initializing it to %s" % ncpus - ) - if ncpus is not None: - header_lines.append("#BSUB -n %s" % ncpus) - if ncpus > 1: - # span[hosts=1] _might_ affect queue waiting - # time, and is not required if ncpus==1 - header_lines.append('#BSUB -R "span[hosts=1]"') - if mem is None: - # Compute default memory specifications - mem = self.worker_memory - logger.info( - "mem specification for LSF not set, initializing it to %s bytes" % mem - ) - if mem is not None: - lsf_units = lsf_units if lsf_units is not None else lsf_detect_units() - memory_string = lsf_format_bytes_ceil(mem, lsf_units=lsf_units) - header_lines.append("#BSUB -M %s" % memory_string) - if walltime is not None: - header_lines.append("#BSUB -W %s" % walltime) - header_lines.extend(["#BSUB %s" % arg for arg in job_extra]) - - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) + self.job_header = self.template_env.get_template("lsf_job_header.j2").render( + name=self.name, + job_name=self.job_name, + log_directory=self.log_directory, + queue=queue, + project=project, + ncpus=ncpus, + worker_cores=self.worker_cores, + mem=mem, + worker_memory=self.worker_memory, + lsf_units=lsf_units, + walltime=walltime, + job_extra=job_extra, + logger=logger, + ) logger.debug("Job script: \n %s" % self.job_script()) + @property + def template_env(self): + env = super().template_env + env.filters["set_ncpus"] = set_ncpus + env.filters["set_mem"] = set_mem + env.filters["format_memory"] = format_memory + return env + async def _submit_job(self, script_filename): if self.use_stdin: piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"] @@ -112,6 +92,29 @@ async def _submit_job(self, script_filename): return result +def set_ncpus(ncpus, worker_cores, logger): + if ncpus is None: + ncpus = worker_cores + logger.info( + "ncpus specification for LSF not set, initializing it to %s" % ncpus + ) + return ncpus + + +def set_mem(mem, worker_memory, logger): + if mem is None: + mem = worker_memory + logger.info( + "mem specification for LSF not set, initializing it to %s bytes" % mem + ) + return mem + + +def format_memory(mem, lsf_units): + lsf_units = lsf_units if lsf_units is not None else lsf_detect_units() + return lsf_format_bytes_ceil(mem, lsf_units=lsf_units) + + def lsf_format_bytes_ceil(n, lsf_units="mb"): """Format bytes as text diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 13e8cceb..7f24159d 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -45,31 +45,15 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - if self.job_name is not None: - header_lines.append("#OAR -n %s" % self.job_name) - if queue is not None: - header_lines.append("#OAR -q %s" % queue) - if project is not None: - header_lines.append("#OAR --project %s" % project) - - # OAR needs to have the resource on a single line otherwise it is - # considered as a "moldable job" (i.e. the scheduler can chose between - # multiple sets of resources constraints) - resource_spec_list = [] - if resource_spec is None: - # default resource_spec if not specified. Crucial to specify - # nodes=1 to make sure the cores allocated are on the same node. - resource_spec = "/nodes=1/core=%d" % self.worker_cores - resource_spec_list.append(resource_spec) - if walltime is not None: - resource_spec_list.append("walltime=%s" % walltime) - - full_resource_spec = ",".join(resource_spec_list) - header_lines.append("#OAR -l %s" % full_resource_spec) - header_lines.extend(["#OAR %s" % arg for arg in job_extra]) - - self.job_header = "\n".join(header_lines) + self.job_header = self.template_env.get_template("oar_job_header.j2").render( + job_name=self.job_name, + queue=queue, + project=project, + resource_spec=resource_spec, + walltime=walltime, + worker_cores=self.worker_cores, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index b89d7c7e..c9102032 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -34,6 +34,19 @@ def pbs_format_bytes_ceil(n): return "%dB" % n +def pbs_format_resource_spec(resource_spec, worker_cores, worker_memory): + if resource_spec is None: + # Compute default resources specifications + resource_spec = "select=1:ncpus=%d" % worker_cores + memory_string = pbs_format_bytes_ceil(worker_memory) + resource_spec += ":mem=" + memory_string + logger.info( + "Resource specification for PBS not set, initializing it to %s" + % resource_spec + ) + return resource_spec + + class PBSJob(Job): submit_command = "qsub" cancel_command = "qdel" @@ -73,37 +86,26 @@ def __init__( # Try to find a project name from environment variable project = project or os.environ.get("PBS_ACCOUNT") - header_lines = [] - # PBS header build - if self.job_name is not None: - header_lines.append("#PBS -N %s" % self.job_name) - if queue is not None: - header_lines.append("#PBS -q %s" % queue) - if project is not None: - header_lines.append("#PBS -A %s" % project) - if resource_spec is None: - # Compute default resources specifications - resource_spec = "select=1:ncpus=%d" % self.worker_cores - memory_string = pbs_format_bytes_ceil(self.worker_memory) - resource_spec += ":mem=" + memory_string - logger.info( - "Resource specification for PBS not set, initializing it to %s" - % resource_spec - ) - if resource_spec is not None: - header_lines.append("#PBS -l %s" % resource_spec) - if walltime is not None: - header_lines.append("#PBS -l walltime=%s" % walltime) - if self.log_directory is not None: - header_lines.append("#PBS -e %s/" % self.log_directory) - header_lines.append("#PBS -o %s/" % self.log_directory) - header_lines.extend(["#PBS %s" % arg for arg in job_extra]) - - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) + self.job_header = self.template_env.get_template("pbs_job_header.j2").render( + job_name=self.job_name, + queue=queue, + project=project, + resource_spec=resource_spec, + worker_cores=self.worker_cores, + worker_memory=self.worker_memory, + walltime=walltime, + log_directory=self.log_directory, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) + @property + def template_env(self): + env = super().template_env + env.filters["pbs_format_resource_spec"] = pbs_format_resource_spec + return env + class PBSCluster(JobQueueCluster): __doc__ = """ Launch Dask on a PBS cluster diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 10fa24a6..39f4861c 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -41,34 +41,15 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - if self.job_name is not None: - header_lines.append("#$ -N %(job-name)s") - if queue is not None: - header_lines.append("#$ -q %(queue)s") - if project is not None: - header_lines.append("#$ -P %(project)s") - if resource_spec is not None: - header_lines.append("#$ -l %(resource_spec)s") - if walltime is not None: - header_lines.append("#$ -l h_rt=%(walltime)s") - if self.log_directory is not None: - header_lines.append("#$ -e %(log_directory)s/") - header_lines.append("#$ -o %(log_directory)s/") - header_lines.extend(["#$ -cwd", "#$ -j y"]) - header_lines.extend(["#$ %s" % arg for arg in job_extra]) - header_template = "\n".join(header_lines) - - config = { - "job-name": self.job_name, - "queue": queue, - "project": project, - "processes": self.worker_processes, - "walltime": walltime, - "resource_spec": resource_spec, - "log_directory": self.log_directory, - } - self.job_header = header_template % config + self.job_header = self.template_env.get_template("sge_job_header.j2").render( + job_name=self.job_name, + queue=queue, + project=project, + walltime=walltime, + resource_spec=resource_spec, + log_directory=self.log_directory, + job_extra=job_extra, + ) logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index e2871b02..a5f80b82 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -44,43 +44,30 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name) - header_lines = [] - # SLURM header build - if self.job_name is not None: - header_lines.append("#SBATCH -J %s" % self.job_name) - if self.log_directory is not None: - header_lines.append( - "#SBATCH -e %s/%s-%%J.err" - % (self.log_directory, self.job_name or "worker") - ) - header_lines.append( - "#SBATCH -o %s/%s-%%J.out" - % (self.log_directory, self.job_name or "worker") - ) - if queue is not None: - header_lines.append("#SBATCH -p %s" % queue) - if project is not None: - header_lines.append("#SBATCH -A %s" % project) - - # Init resources, always 1 task, - # and then number of cpu is processes * threads if not set - header_lines.append("#SBATCH -n 1") - header_lines.append( - "#SBATCH --cpus-per-task=%d" % (job_cpu or self.worker_cores) + self.job_header = self.template_env.get_template("slurm_job_header.j2").render( + job_name=self.job_name, + log_directory=self.log_directory, + queue=queue, + project=project, + job_cpu=job_cpu, + worker_cores=self.worker_cores, + memory=job_mem, + worker_memory=self.worker_memory, + walltime=walltime, + job_extra=job_extra, ) - # Memory - memory = job_mem - if job_mem is None: - memory = slurm_format_bytes_ceil(self.worker_memory) - if memory is not None: - header_lines.append("#SBATCH --mem=%s" % memory) - if walltime is not None: - header_lines.append("#SBATCH -t %s" % walltime) - header_lines.extend(["#SBATCH %s" % arg for arg in job_extra]) + @property + def template_env(self): + env = super().template_env + env.filters["slurm_format_memory"] = slurm_format_memory + return env + - # Declare class attribute that shall be overridden - self.job_header = "\n".join(header_lines) +def slurm_format_memory(memory, worker_memory): + if memory is None: + memory = slurm_format_bytes_ceil(worker_memory) + return memory def slurm_format_bytes_ceil(n): diff --git a/dask_jobqueue/templates/command.j2 b/dask_jobqueue/templates/command.j2 new file mode 100644 index 00000000..a331dbb5 --- /dev/null +++ b/dask_jobqueue/templates/command.j2 @@ -0,0 +1,22 @@ +{{python}} -m distributed.cli.dask_worker {{scheduler}} --nthreads {{threads}}{{" "}} +{%- if processes is not none and processes > 1 -%} +--nprocs {{processes}}{{" "}} +{%- endif -%} +--memory-limit {{memory}} --name {{name}}{{" "}} +{%- if nanny -%} +--nanny +{%- else -%} +--no-nanny +{%- endif -%}{{" "}} +{%- if death_timeout is not none -%} +--death-timeout {{ death_timeout }}{{" "}} +{%- endif -%} +{%- if local_directory is not none -%} +--local-directory {{ local_directory }}{{" "}} +{%- endif -%} +{%- if extra is not none -%} +{{ extra | join(" ") }}{{" "}} +{%- endif -%} +{%- if interface -%} +--interface {{ interface }} +{%- endif -%} diff --git a/dask_jobqueue/templates/htcondor_script.sh b/dask_jobqueue/templates/htcondor_script.sh new file mode 100644 index 00000000..5845adb3 --- /dev/null +++ b/dask_jobqueue/templates/htcondor_script.sh @@ -0,0 +1,29 @@ +{{ shebang }} + +MY.DaskWorkerName = "htcondor--$F(MY.JobId)--" +RequestCpus = MY.DaskWorkerCores +RequestMemory = floor(MY.DaskWorkerMemory / 1048576) +RequestDisk = floor(MY.DaskWorkerDisk / 1024) +MY.JobId = "$(ClusterId).$(ProcId)" +MY.DaskWorkerCores = {{ worker_cores }} +MY.DaskWorkerMemory = {{ worker_memory }} +MY.DaskWorkerDisk = {{ worker_disk }} +{%- if job_extra is not none -%} +{%- for k,v in job_extra.items() %} +{{ k }} = {{ v }} +{%- endfor -%} +{%- endif %} +{% if log_directory is not none -%} +LogDirectory = {{ log_directory }} +Output = $(LogDirectory)/worker-$F(MY.JobId).out +Error = $(LogDirectory)/worker-$F(MY.JobId).err +Log = $(LogDirectory)/worker-$(ClusterId).log +Stream_Output = True +Stream_Error = True +{%- endif %} + +Environment = "{{ env_extra | env_lines_to_dict | quote_environment }}" +Arguments = "-c '{{ worker_command }}'" +Executable = {{ executable }} + +Queue \ No newline at end of file diff --git a/dask_jobqueue/templates/lsf_job_header.j2 b/dask_jobqueue/templates/lsf_job_header.j2 new file mode 100644 index 00000000..9af17eca --- /dev/null +++ b/dask_jobqueue/templates/lsf_job_header.j2 @@ -0,0 +1,32 @@ +{% if name is not none -%} +#BSUB -J {{ job_name }} +{%- endif %} +{% if log_directory is not none -%} +#BSUB -e {{ log_directory }}/{{ name if name is not none else 'worker' }}-%J.err +#BSUB -o {{ log_directory }}/{{ name if name is not none else 'worker' }}-%J.out +{%- endif %} +{% if queue is not none -%} +#BSUB -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#BSUB -P "{{ project }}" +{%- endif %} +{% set ncpus = ncpus | set_ncpus(worker_cores, logger) -%} +{% if ncpus is not none -%} +#BSUB -n {{ ncpus }} +{% if ncpus > 1 -%} +#BSUB -R "span[hosts=1]" +{%- endif %} +{%- endif %} +{% set mem = mem | set_mem(worker_memory, logger) -%} +{% if mem is not none -%} +#BSUB -M {{ mem | format_memory(lsf_units) }} +{%- endif %} +{% if walltime is not none -%} +#BSUB -W {{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#BSUB {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/oar_job_header.j2 b/dask_jobqueue/templates/oar_job_header.j2 new file mode 100644 index 00000000..0e926f5d --- /dev/null +++ b/dask_jobqueue/templates/oar_job_header.j2 @@ -0,0 +1,21 @@ +{% if job_name is not none -%} +#OAR -n {{ job_name }} +{%- endif %} +{% if queue is not none -%} +#OAR -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#OAR --project {{ project }} +{%- endif %} +#OAR -l{{ " " }} +{%- if resource_spec is none %}/nodes=1/core={{ worker_cores }} +{%- else %}{{ resource_spec }} +{%- endif %} +{%- if walltime is not none -%} +,walltime={{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#OAR {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/pbs_job_header.j2 b/dask_jobqueue/templates/pbs_job_header.j2 new file mode 100644 index 00000000..f1f71bb5 --- /dev/null +++ b/dask_jobqueue/templates/pbs_job_header.j2 @@ -0,0 +1,22 @@ +{% if job_name is not none -%} +#PBS -N {{ job_name }} +{%- endif %} +{% if queue is not none -%} +#PBS -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#PBS -A {{ project }} +{%- endif %} +#PBS -l {{ resource_spec | pbs_format_resource_spec(worker_cores, worker_memory) }} +{% if walltime is not none -%} +#PBS -l walltime={{ walltime }} +{%- endif %} +{% if log_directory is not none -%} +#PBS -o {{ log_directory }}/ +#PBS -e {{ log_directory }}/ +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#PBS {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/script.sh b/dask_jobqueue/templates/script.sh new file mode 100644 index 00000000..61277c4f --- /dev/null +++ b/dask_jobqueue/templates/script.sh @@ -0,0 +1,5 @@ +{{shebang}} + +{{job_header}} +{{env_header}} +{{worker_command}} diff --git a/dask_jobqueue/templates/sge_job_header.j2 b/dask_jobqueue/templates/sge_job_header.j2 new file mode 100644 index 00000000..0a7c4728 --- /dev/null +++ b/dask_jobqueue/templates/sge_job_header.j2 @@ -0,0 +1,26 @@ +{% if job_name is not none -%} +#$ -N {{ job_name }} +{%- endif %} +{% if queue is not none -%} +#$ -q {{ queue }} +{%- endif %} +{% if project is not none -%} +#$ -P {{ project }} +{%- endif %} +{% if resource_spec is not none -%} +#$ -l {{ resource_spec }} +{%- endif %} +{% if walltime is not none -%} +#$ -l h_rt={{ walltime }} +{%- endif %} +{% if log_directory is not none -%} +#$ -e {{ log_directory }}/ +#$ -o {{ log_directory }}/ +{%- endif %} +#$ -cwd +#$ -j y +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#$ {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/templates/slurm_job_header.j2 b/dask_jobqueue/templates/slurm_job_header.j2 new file mode 100644 index 00000000..920f0d56 --- /dev/null +++ b/dask_jobqueue/templates/slurm_job_header.j2 @@ -0,0 +1,24 @@ +{% if job_name is not none -%} +#SBATCH -J {{ job_name }} +{%- endif %} +{% if log_directory is not none -%} +#SBATCH -o {{ log_directory }}/{{ job_name if job_name is not none else "worker" }}-%J.err +#SBATCH -e {{ log_directory }}/{{ job_name if job_name is not none else "worker" }}-%J.out +{%- endif %} +{% if queue is not none -%} +#SBATCH -p {{ queue }} +{%- endif %} +{% if project is not none -%} +#SBATCH -A {{ project }} +{%- endif %} +#SBATCH -n 1 +#SBATCH --cpus-per-task={{ job_cpu if job_cpu is not none else worker_cores }} +#SBATCH --mem={{ memory | slurm_format_memory(worker_memory) }} +{% if walltime is not none -%} +#SBATCH -t {{ walltime }} +{%- endif %} +{%- if job_extra is not none -%} +{%- for j in job_extra %} +#SBATCH {{ j }} +{%- endfor -%} +{%- endif %} \ No newline at end of file diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 6e163fc9..3b551d7d 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -11,13 +11,6 @@ QUEUE_WAIT = 30 # seconds -def test_header(): - with HTCondorCluster(cores=1, memory="100MB", disk="100MB") as cluster: - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerCores"] == 1 - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerDisk"] == 100000000 - assert cluster._dummy_job.job_header_dict["MY.DaskWorkerMemory"] == 100000000 - - def test_job_script(): with HTCondorCluster( cores=4, diff --git a/requirements.txt b/requirements.txt index e1a85007..ab270ad7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ dask>=2.19 distributed>=2.19 +Jinja2 diff --git a/setup.py b/setup.py index e5966511..e301f6c8 100755 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ python_requires=">=3.6", license="BSD 3-Clause", packages=["dask_jobqueue"], + package_data={"dask_jobqueue": ["templates/*"]}, include_package_data=True, install_requires=install_requires, tests_require=["pytest >= 2.7.1"],