From ed8d0581e977d6f7d82d96c1d98d4730b3e344f6 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Fri, 24 Aug 2018 13:10:48 +0000 Subject: [PATCH] Adding info for scaling with cores and memory --- dask_jobqueue/core.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index e26ea02e..95ea08a6 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -204,7 +204,7 @@ def __init__(self, # Keep information on process, threads and memory, for use in # subclasses - self.worker_memory = parse_bytes(memory) if memory is not None else None + self.worker_memory = parse_bytes(memory) self.worker_processes = processes self.worker_cores = cores self.name = name @@ -220,13 +220,11 @@ def __init__(self, # dask-worker command line build dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable) self._command_template = ' '.join([dask_worker_command, self.scheduler.address]) - self._command_template += " --nthreads %d" % self.worker_threads + self._command_template += " --nthreads %d" % self.worker_process_threads if processes is not None and processes > 1: self._command_template += " --nprocs %d" % processes - mem = format_bytes(self.worker_memory / self.worker_processes) - mem = mem.replace(' ', '') - self._command_template += " --memory-limit %s" % mem + self._command_template += " --memory-limit %s" % self.worker_process_memory self._command_template += " --name %s--${JOB_ID}--" % name if death_timeout is not None: @@ -238,7 +236,7 @@ def __init__(self, def __repr__(self): running_workers = sum(len(value) for value in self.running_jobs.values()) - running_cores = running_workers * self.worker_threads + running_cores = running_workers * self.worker_process_threads total_jobs = len(self.pending_jobs) + len(self.running_jobs) total_workers = total_jobs * self.worker_processes running_memory = running_workers * self.worker_memory / self.worker_processes @@ -265,9 +263,20 @@ def finished_jobs(self): return self._scheduler_plugin.finished_jobs @property - def worker_threads(self): + def worker_process_threads(self): return int(self.worker_cores / self.worker_processes) + @property + def worker_process_memory(self): + mem = format_bytes(self.worker_memory / self.worker_processes) + mem = mem.replace(' ', '') + return mem + + @property + def worker_info(self): + ''' single worker process info needed for scaling on cores or memory ''' + return {'cores': self.worker_process_threads, 'memory': self.worker_process_memory} + def job_script(self): """ Construct a job submission script """ pieces = {'job_header': self.job_header,