Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 52 additions & 31 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,43 +143,43 @@ def __init__(

super().__init__()

default_config_name = self.default_config_name()
if config_name is None:
config_name = getattr(type(self), "config_name")
if config_name is None:
raise ValueError(
"Looks like you are trying to create a class that inherits from dask_jobqueue.core.Job. "
"If that is the case, you need to:\n"
"- set the 'config_name' class variable to a non-None value\n"
"- create a section in jobqueue.yaml with the value of 'config_name'\n"
"If that is not the case, please open an issue in https://github.com/dask/dask-jobqueue/issues."
)
config_name = default_config_name
self.config_name = config_name

if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % config_name)
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if cores is None:
cores = dask.config.get("jobqueue.%s.cores" % config_name)
cores = dask.config.get("jobqueue.%s.cores" % self.config_name)
if memory is None:
memory = dask.config.get("jobqueue.%s.memory" % config_name)
memory = dask.config.get("jobqueue.%s.memory" % self.config_name)
if processes is None:
processes = dask.config.get("jobqueue.%s.processes" % config_name)
processes = dask.config.get("jobqueue.%s.processes" % self.config_name)
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
interface = dask.config.get("jobqueue.%s.interface" % self.config_name)
if death_timeout is None:
death_timeout = dask.config.get("jobqueue.%s.death-timeout" % config_name)
death_timeout = dask.config.get(
"jobqueue.%s.death-timeout" % self.config_name
)
if local_directory is None:
local_directory = dask.config.get(
"jobqueue.%s.local-directory" % config_name
"jobqueue.%s.local-directory" % self.config_name
)
if extra is None:
extra = dask.config.get("jobqueue.%s.extra" % config_name)
extra = dask.config.get("jobqueue.%s.extra" % self.config_name)
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % config_name)
env_extra = dask.config.get("jobqueue.%s.env-extra" % self.config_name)
if header_skip is None:
header_skip = dask.config.get("jobqueue.%s.header-skip" % config_name, ())
header_skip = dask.config.get(
"jobqueue.%s.header-skip" % self.config_name, ()
)
if log_directory is None:
log_directory = dask.config.get("jobqueue.%s.log-directory" % config_name)
log_directory = dask.config.get(
"jobqueue.%s.log-directory" % self.config_name
)
if shebang is None:
shebang = dask.config.get("jobqueue.%s.shebang" % config_name)
shebang = dask.config.get("jobqueue.%s.shebang" % self.config_name)

if cores is None or memory is None:
job_class_name = self.__class__.__name__
Expand All @@ -191,7 +191,7 @@ def __init__(
)
)

# This attribute should be overridden
# This attribute should be set in the derived class
self.job_header = None

if interface:
Expand Down Expand Up @@ -239,6 +239,18 @@ def __init__(
if not os.path.exists(self.log_directory):
os.makedirs(self.log_directory)

@classmethod
def default_config_name(cls):
config_name = getattr(cls, "config_name", None)
if config_name is None:
raise ValueError(
"The class {} is required to have 'config_name' class variable.\n"
"If you have created this class, please add a 'config_name' class variable.\n"
"If not this may be a bug, feel free to create an issue at: "
"https://github.com/dask/dask-jobqueue/issues/new".format(cls)
)
return config_name

def job_script(self):
""" Construct a job submission script """
header = "\n".join(
Expand Down Expand Up @@ -392,8 +404,6 @@ class JobQueueCluster(SpecCluster):
cluster_parameters=cluster_parameters
)

job_cls = None

def __init__(
self,
n_workers=0,
Expand All @@ -414,18 +424,29 @@ def __init__(
**kwargs
):
self.status = "created"

default_job_cls = getattr(type(self), "job_cls", None)
self.job_cls = default_job_cls

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, not sure to understand, but I probably lack some understanding of Python.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I understand seeing you've deleted the class attribute job_cls from JobQueueCluster.
I'm not sure this helps, I still need some insights of why you prefer it this way.

if job_cls is not None:
self.job_cls = job_cls

if self.job_cls is None:
raise ValueError(
"You must provide a Job type like PBSJob, SLURMJob, "
"or SGEJob with the job_cls= argument."
"You need to specify a Job type. Two cases:\n"
"- you are inheriting from JobQueueCluster (most likely): you need to add a 'job_cls' class variable "
"in your JobQueueCluster-derived class {}\n"
"- you are using JobQueueCluster directly (less likely, only useful for tests): "
"please explicitly pass a Job type through the 'job_cls' parameter.".format(
type(self)
)
)

if config_name:
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
default_config_name = self.job_cls.default_config_name()
if config_name is None:
config_name = default_config_name

if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)

scheduler = {
"cls": Scheduler, # Use local scheduler for now
Expand All @@ -437,8 +458,8 @@ def __init__(
"security": security,
},
}
if config_name:
kwargs["config_name"] = config_name

kwargs["config_name"] = config_name
kwargs["interface"] = interface
kwargs["protocol"] = protocol
kwargs["security"] = security
Expand Down
19 changes: 10 additions & 9 deletions dask_jobqueue/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,29 @@ class HTCondorJob(Job):
# Python (can't find its libs), so we have to go through the shell.
executable = "/bin/sh"

def __init__(
self, *args, disk=None, job_extra=None, config_name="htcondor", **kwargs
):
config_name = "htcondor"

def __init__(self, *args, disk=None, job_extra=None, config_name=None, **kwargs):
super().__init__(*args, config_name=config_name, **kwargs)

if disk is None:
disk = dask.config.get("jobqueue.%s.disk" % config_name)
disk = dask.config.get("jobqueue.%s.disk" % self.config_name)
if disk is None:
raise ValueError(
"You must specify how much disk to use per job like ``disk='1 GB'``"
)
self.worker_disk = parse_bytes(disk)
if job_extra is None:
self.job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name, {})
self.job_extra = dask.config.get(
"jobqueue.%s.job-extra" % self.config_name, {}
)
else:
self.job_extra = job_extra

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

env_extra = kwargs.get("env_extra", None)
if env_extra is None:
env_extra = dask.config.get(
"jobqueue.%s.env-extra" % config_name, default=[]
"jobqueue.%s.env-extra" % self.config_name, default=[]
)
self.env_dict = self.env_lines_to_dict(env_extra)
self.env_dict["JOB_ID"] = "$F(MY.JobId)"
Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="local",
config_name=None,
**kwargs
):
# Instantiate args and parameters from parent abstract class
Expand Down
24 changes: 12 additions & 12 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class LSFJob(Job):
submit_command = "bsub"
cancel_command = "bkill"
config_name = "lsf"

def __init__(
self,
Expand All @@ -28,34 +29,33 @@ def __init__(
walltime=None,
job_extra=None,
lsf_units=None,
config_name="lsf",
config_name=None,
use_stdin=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if ncpus is None:
ncpus = dask.config.get("jobqueue.%s.ncpus" % config_name)
ncpus = dask.config.get("jobqueue.%s.ncpus" % self.config_name)
if mem is None:
mem = dask.config.get("jobqueue.%s.mem" % config_name)
mem = dask.config.get("jobqueue.%s.mem" % self.config_name)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name)
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % self.config_name)

if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name)
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name)
if use_stdin is None:
use_stdin = lsf_version() < "10"
self.use_stdin = use_stdin

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

header_lines = []
# LSF header build
if self.name is not None:
Expand Down
19 changes: 11 additions & 8 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class OARJob(Job):
submit_command = "oarsub"
cancel_command = "oardel"
job_id_regexp = r"OAR_JOB_ID=(?P<job_id>\d+)"
config_name = "oar"

def __init__(
self,
Expand All @@ -23,21 +24,23 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="oar",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)

super().__init__(*args, config_name=config_name, **kwargs)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)

header_lines = []
if self.job_name is not None:
Expand Down
19 changes: 10 additions & 9 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,26 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="pbs",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if project is None:
project = dask.config.get(
"jobqueue.%s.project" % config_name
"jobqueue.%s.project" % self.config_name
) or os.environ.get("PBS_ACCOUNT")

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

# Try to find a project name from environment variable
project = project or os.environ.get("PBS_ACCOUNT")

Expand Down
20 changes: 11 additions & 9 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class SGEJob(Job):
submit_command = "qsub"
cancel_command = "qdel"
config_name = "sge"

def __init__(
self,
Expand All @@ -19,21 +20,23 @@ def __init__(
resource_spec=None,
walltime=None,
job_extra=None,
config_name="sge",
config_name=None,
**kwargs
):
super().__init__(*args, config_name=config_name, **kwargs)

if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % config_name)
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % config_name)
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get("jobqueue.%s.resource-spec" % config_name)
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % config_name)
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name)

super().__init__(*args, config_name=config_name, **kwargs)
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)

header_lines = []
if self.job_name is not None:
Expand Down Expand Up @@ -114,4 +117,3 @@ class SGECluster(JobQueueCluster):
job=job_parameters, cluster=cluster_parameters
)
job_cls = SGEJob
config_name = "sge"
Loading