Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 22 additions & 29 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import weakref
import abc

from jinja2 import Environment, PackageLoader

import dask

from distributed.core import Status
Expand Down Expand Up @@ -113,14 +115,6 @@ class Job(ProcessInterface, abc.ABC):
job_parameters=job_parameters
)

_script_template = """
%(shebang)s

%(job_header)s
%(env_header)s
%(worker_command)s
""".lstrip()

# Following class attributes should be overridden by extending classes.
submit_command = None
cancel_command = None
Expand Down Expand Up @@ -236,27 +230,19 @@ def __init__(
self._env_header = "\n".join(filter(None, env_extra))
self.header_skip = set(header_skip)

# dask-worker command line build
dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict(
python=python
self._command_template = self.template_env.get_template("command.j2").render(
python=python,
scheduler=self.scheduler,
threads=self.worker_process_threads,
processes=processes,
memory=self.worker_process_memory,
nanny=nanny,
name=name,
death_timeout=death_timeout,
local_directory=local_directory,
interface=interface,
extra=extra,
)
command_args = [dask_worker_command, self.scheduler]
command_args += ["--nthreads", self.worker_process_threads]
if processes is not None and processes > 1:
command_args += ["--nprocs", processes]

command_args += ["--memory-limit", self.worker_process_memory]
command_args += ["--name", str(name)]
command_args += ["--nanny" if nanny else "--no-nanny"]

if death_timeout is not None:
command_args += ["--death-timeout", death_timeout]
if local_directory is not None:
command_args += ["--local-directory", local_directory]
if extra is not None:
command_args += extra

self._command_template = " ".join(map(str, command_args))

self.log_directory = log_directory
if self.log_directory is not None:
Expand All @@ -275,6 +261,13 @@ def default_config_name(cls):
)
return config_name

@property
def template_env(self):
"""
Jinja2 template rendering environment
"""
return Environment(loader=PackageLoader("dask_jobqueue", "templates"))

def job_script(self):
"""Construct a job submission script"""
header = "\n".join(
Expand All @@ -290,7 +283,7 @@ def job_script(self):
"env_header": self._env_header,
"worker_command": self._command_template,
}
return self._script_template % pieces
return self.template_env.get_template("script.sh").render(**pieces)

@contextmanager
def job_file(self):
Expand Down
134 changes: 33 additions & 101 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@


class HTCondorJob(Job):
_script_template = """
%(shebang)s

%(job_header)s

Environment = "%(quoted_environment)s"
Arguments = "%(quoted_arguments)s"
Executable = %(executable)s

Queue
""".lstrip()

submit_command = "condor_submit"
cancel_command = "condor_rm"
job_id_regexp = r"(?P<job_id>\d+\.\d+)"
Expand Down Expand Up @@ -65,63 +53,21 @@ def __init__(
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % self.config_name, default=[]
)
self.env_dict = self.env_lines_to_dict(env_extra)

self.job_header_dict = {
"MY.DaskWorkerName": '"htcondor--$F(MY.JobId)--"',
"RequestCpus": "MY.DaskWorkerCores",
"RequestMemory": "floor(MY.DaskWorkerMemory / 1048576)",
"RequestDisk": "floor(MY.DaskWorkerDisk / 1024)",
"MY.JobId": '"$(ClusterId).$(ProcId)"',
"MY.DaskWorkerCores": self.worker_cores,
"MY.DaskWorkerMemory": self.worker_memory,
"MY.DaskWorkerDisk": self.worker_disk,
}
if self.log_directory:
self.job_header_dict.update(
{
"LogDirectory": self.log_directory,
# $F(...) strips quotes
"Output": "$(LogDirectory)/worker-$F(MY.JobId).out",
"Error": "$(LogDirectory)/worker-$F(MY.JobId).err",
"Log": "$(LogDirectory)/worker-$(ClusterId).log",
# We kill all the workers to stop them so we need to stream their
# output+error if we ever want to see anything
"Stream_Output": True,
"Stream_Error": True,
}
)
if self.job_extra:
self.job_header_dict.update(self.job_extra)

def env_lines_to_dict(self, env_lines):
"""Convert an array of export statements (what we get from env-extra
in the config) into a dict"""
env_dict = {}
for env_line in env_lines:
split_env_line = shlex.split(env_line)
if split_env_line[0] == "export":
split_env_line = split_env_line[1:]
for item in split_env_line:
if "=" in item:
k, v = item.split("=", 1)
env_dict[k] = v
return env_dict
self.env_extra = env_extra

def job_script(self):
"""Construct a job submission script"""
quoted_arguments = quote_arguments(["-c", self._command_template])
quoted_environment = quote_environment(self.env_dict)
job_header_lines = "\n".join(
"%s = %s" % (k, v) for k, v in self.job_header_dict.items()
return self.template_env.get_template("htcondor_script.sh").render(
shebang=self.shebang,
worker_command=self._command_template,
executable=self.executable,
log_directory=self.log_directory,
worker_cores=self.worker_cores,
worker_disk=self.worker_disk,
worker_memory=self.worker_memory,
job_extra=self.job_extra,
env_extra=self.env_extra,
)
return self._script_template % {
"shebang": self.shebang,
"job_header": job_header_lines,
"quoted_environment": quoted_environment,
"quoted_arguments": quoted_arguments,
"executable": self.executable,
}

def _job_id_from_submit_output(self, out):
cluster_id_regexp = r"submitted to cluster (\d+)"
Expand All @@ -135,47 +81,33 @@ def _job_id_from_submit_output(self, out):
raise ValueError(msg)
return "%s.0" % match.group(1)

@property
def template_env(self):
env = super().template_env
env.filters["env_lines_to_dict"] = env_lines_to_dict
env.filters["quote_environment"] = quote_environment
return env


def env_lines_to_dict(env_lines):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be done directly in the template file?

"""Convert an array of export statements (what we get from env-extra
in the config) into a dict"""
env_dict = {}
for env_line in env_lines:
split_env_line = shlex.split(env_line)
if split_env_line[0] == "export":
split_env_line = split_env_line[1:]
for item in split_env_line:
if "=" in item:
k, v = item.split("=", 1)
env_dict[k] = v
return env_dict


def _double_up_quotes(instr):
return instr.replace("'", "''").replace('"', '""')


def quote_arguments(args):
"""Quote a string or list of strings using the Condor submit file "new" argument quoting rules.

Returns
-------
str
The arguments in a quoted form.

Warnings
--------
You will need to surround the result in double-quotes before using it in
the Arguments attribute.

Examples
--------
>>> quote_arguments(["3", "simple", "arguments"])
'3 simple arguments'
>>> quote_arguments(["one", "two with spaces", "three"])
'one \'two with spaces\' three'
>>> quote_arguments(["one", "\"two\"", "spacy 'quoted' argument"])
'one ""two"" \'spacey \'\'quoted\'\' argument\''
"""
if isinstance(args, str):
args_list = [args]
else:
args_list = args

quoted_args = []
for a in args_list:
qa = _double_up_quotes(a)
if " " in qa or "'" in qa:
qa = "'" + qa + "'"
quoted_args.append(qa)
return " ".join(quoted_args)


def quote_environment(env):
"""Quote a dict of strings using the Condor submit file "new" environment quoting rules.

Expand Down
89 changes: 46 additions & 43 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,52 +57,32 @@ def __init__(
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name)
self.use_stdin = use_stdin

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append("#BSUB -J %s" % self.job_name)
if self.log_directory is not None:
header_lines.append(
"#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker")
)
header_lines.append(
"#BSUB -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker")
)
if queue is not None:
header_lines.append("#BSUB -q %s" % queue)
if project is not None:
header_lines.append('#BSUB -P "%s"' % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info(
"ncpus specification for LSF not set, initializing it to %s" % ncpus
)
if ncpus is not None:
header_lines.append("#BSUB -n %s" % ncpus)
if ncpus > 1:
# span[hosts=1] _might_ affect queue waiting
# time, and is not required if ncpus==1
header_lines.append('#BSUB -R "span[hosts=1]"')
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info(
"mem specification for LSF not set, initializing it to %s bytes" % mem
)
if mem is not None:
lsf_units = lsf_units if lsf_units is not None else lsf_detect_units()
memory_string = lsf_format_bytes_ceil(mem, lsf_units=lsf_units)
header_lines.append("#BSUB -M %s" % memory_string)
if walltime is not None:
header_lines.append("#BSUB -W %s" % walltime)
header_lines.extend(["#BSUB %s" % arg for arg in job_extra])

# Declare class attribute that shall be overridden
self.job_header = "\n".join(header_lines)
self.job_header = self.template_env.get_template("lsf_job_header.j2").render(
name=self.name,
job_name=self.job_name,
log_directory=self.log_directory,
queue=queue,
project=project,
ncpus=ncpus,
worker_cores=self.worker_cores,
mem=mem,
worker_memory=self.worker_memory,
lsf_units=lsf_units,
walltime=walltime,
job_extra=job_extra,
logger=logger,
)

logger.debug("Job script: \n %s" % self.job_script())

@property
def template_env(self):
env = super().template_env
env.filters["set_ncpus"] = set_ncpus
env.filters["set_mem"] = set_mem
env.filters["format_memory"] = format_memory
return env

async def _submit_job(self, script_filename):
if self.use_stdin:
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
Expand All @@ -112,6 +92,29 @@ async def _submit_job(self, script_filename):
return result


def set_ncpus(ncpus, worker_cores, logger):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for LSF, couldn't we code this in the template file?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible though the templating logic can easily get messy and it is often easier and cleaner to offload more complicated logic to filters (which is exactly what is done here). Additionally, I don't think you can easily log things from within a Jinja template.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking of removing the log, which don't seem really useful to me.

But actually, in this case, we could have kept the ncpus and mem settings in the __init__ method, don't we?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely agree regarding ncpus and mem. They're so simple they should just be in the __init__.

The format_memory function should probably remain as a filter because of the complexity in lsf_format_bytes_ceil and lsf_detect_units

if ncpus is None:
ncpus = worker_cores
logger.info(
"ncpus specification for LSF not set, initializing it to %s" % ncpus
)
return ncpus


def set_mem(mem, worker_memory, logger):
if mem is None:
mem = worker_memory
logger.info(
"mem specification for LSF not set, initializing it to %s bytes" % mem
)
return mem


def format_memory(mem, lsf_units):
lsf_units = lsf_units if lsf_units is not None else lsf_detect_units()
return lsf_format_bytes_ceil(mem, lsf_units=lsf_units)


def lsf_format_bytes_ceil(n, lsf_units="mb"):
"""Format bytes as text

Expand Down
34 changes: 9 additions & 25 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,15 @@ def __init__(
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)

header_lines = []
if self.job_name is not None:
header_lines.append("#OAR -n %s" % self.job_name)
if queue is not None:
header_lines.append("#OAR -q %s" % queue)
if project is not None:
header_lines.append("#OAR --project %s" % project)

# OAR needs to have the resource on a single line otherwise it is
# considered as a "moldable job" (i.e. the scheduler can chose between
# multiple sets of resources constraints)
resource_spec_list = []
if resource_spec is None:
# default resource_spec if not specified. Crucial to specify
# nodes=1 to make sure the cores allocated are on the same node.
resource_spec = "/nodes=1/core=%d" % self.worker_cores
resource_spec_list.append(resource_spec)
if walltime is not None:
resource_spec_list.append("walltime=%s" % walltime)

full_resource_spec = ",".join(resource_spec_list)
header_lines.append("#OAR -l %s" % full_resource_spec)
header_lines.extend(["#OAR %s" % arg for arg in job_extra])

self.job_header = "\n".join(header_lines)
self.job_header = self.template_env.get_template("oar_job_header.j2").render(
job_name=self.job_name,
queue=queue,
project=project,
resource_spec=resource_spec,
walltime=walltime,
worker_cores=self.worker_cores,
job_extra=job_extra,
)

logger.debug("Job script: \n %s" % self.job_script())

Expand Down
Loading