Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def __init__(self,

# Keep information on process, threads and memory, for use in
# subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
self.worker_memory = parse_bytes(memory)
self.worker_processes = processes
self.worker_cores = cores
self.name = name
Expand All @@ -220,13 +220,11 @@ def __init__(self,
# dask-worker command line build
dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable)
self._command_template = ' '.join([dask_worker_command, self.scheduler.address])
self._command_template += " --nthreads %d" % self.worker_threads
self._command_template += " --nthreads %d" % self.worker_process_threads
if processes is not None and processes > 1:
self._command_template += " --nprocs %d" % processes

mem = format_bytes(self.worker_memory / self.worker_processes)
mem = mem.replace(' ', '')
self._command_template += " --memory-limit %s" % mem
self._command_template += " --memory-limit %s" % self.worker_process_memory
self._command_template += " --name %s--${JOB_ID}--" % name

if death_timeout is not None:
Expand All @@ -238,7 +236,7 @@ def __init__(self,

def __repr__(self):
running_workers = sum(len(value) for value in self.running_jobs.values())
running_cores = running_workers * self.worker_threads
running_cores = running_workers * self.worker_process_threads
total_jobs = len(self.pending_jobs) + len(self.running_jobs)
total_workers = total_jobs * self.worker_processes
running_memory = running_workers * self.worker_memory / self.worker_processes
Expand All @@ -265,9 +263,20 @@ def finished_jobs(self):
return self._scheduler_plugin.finished_jobs

@property
def worker_threads(self):
def worker_process_threads(self):
return int(self.worker_cores / self.worker_processes)

@property
def worker_process_memory(self):
mem = format_bytes(self.worker_memory / self.worker_processes)
mem = mem.replace(' ', '')
return mem

@property
def worker_info(self):
''' single worker process info needed for scaling on cores or memory '''
return {'cores': self.worker_process_threads, 'memory': self.worker_process_memory}

def job_script(self):
""" Construct a job submission script """
pieces = {'job_header': self.job_header,
Expand Down