diff --git a/ci/none.sh b/ci/none.sh index 34d26174..f4c1a042 100644 --- a/ci/none.sh +++ b/ci/none.sh @@ -4,7 +4,8 @@ function jobqueue_before_install { # Install miniconda ./ci/conda_setup.sh export PATH="$HOME/miniconda/bin:$PATH" - conda install --yes -c conda-forge python=$TRAVIS_PYTHON_VERSION dask distributed flake8 black pytest docrep + conda install --yes -c conda-forge python=$TRAVIS_PYTHON_VERSION dask distributed flake8 black pytest pytest-asyncio + pip install git+https://github.com/dask/distributed@master --upgrade --no-deps } function jobqueue_install { diff --git a/ci/pbs.sh b/ci/pbs.sh index 2229d812..5fb068df 100644 --- a/ci/pbs.sh +++ b/ci/pbs.sh @@ -19,7 +19,7 @@ function jobqueue_install { } function jobqueue_script { - docker exec -it -u pbsuser pbs_master /bin/bash -c "cd /dask-jobqueue; pytest dask_jobqueue --verbose -E pbs" + docker exec -it -u pbsuser pbs_master /bin/bash -c "cd; pytest /dask-jobqueue/dask_jobqueue --verbose -s -E pbs" } function jobqueue_after_script { diff --git a/ci/pbs/Dockerfile b/ci/pbs/Dockerfile index 1013c91e..8c4a2aa7 100644 --- a/ci/pbs/Dockerfile +++ b/ci/pbs/Dockerfile @@ -30,7 +30,8 @@ RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-L bash miniconda.sh -f -b -p /opt/anaconda && \ /opt/anaconda/bin/conda clean -tipy && \ rm -f miniconda.sh -RUN conda install --yes -c conda-forge python=3.6 dask distributed flake8 pytest docrep +RUN conda install --yes -c conda-forge python=3.6 dask distributed flake8 pytest pytest-asyncio +RUN pip install git+https://github.com/dask/distributed --upgrade --no-deps # Copy entrypoint and other needed scripts COPY ./*.sh / diff --git a/ci/sge.sh b/ci/sge.sh index 1673e9b3..1f244283 100644 --- a/ci/sge.sh +++ b/ci/sge.sh @@ -17,7 +17,7 @@ function jobqueue_install { } function jobqueue_script { - docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; pytest dask_jobqueue --verbose -E sge" + docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; pytest dask_jobqueue --verbose -s -E sge" } function jobqueue_after_script { diff --git a/ci/sge/Dockerfile-master b/ci/sge/Dockerfile-master index d596fd60..d6e486d2 100644 --- a/ci/sge/Dockerfile-master +++ b/ci/sge/Dockerfile-master @@ -10,7 +10,8 @@ RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-L rm -f miniconda.sh ENV PATH /opt/anaconda/bin:$PATH ARG PYTHON_VERSION -RUN conda install -c conda-forge python=$PYTHON_VERSION dask distributed pytest && conda clean -tipy +RUN conda install -c conda-forge python=$PYTHON_VERSION dask distributed pytest pytest-asyncio && conda clean -tipy +RUN pip install git+https://github.com/dask/distributed --upgrade --no-deps COPY ./*.sh / COPY ./*.txt / diff --git a/ci/sge/Dockerfile-slave b/ci/sge/Dockerfile-slave index d97647cf..777547ef 100644 --- a/ci/sge/Dockerfile-slave +++ b/ci/sge/Dockerfile-slave @@ -10,7 +10,8 @@ RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-L rm -f miniconda.sh ENV PATH /opt/anaconda/bin:$PATH ARG PYTHON_VERSION -RUN conda install -c conda-forge python=$PYTHON_VERSION dask distributed pytest && conda clean -tipy +RUN conda install -c conda-forge python=$PYTHON_VERSION dask distributed pytest pytest-asyncio && conda clean -tipy +RUN pip install git+https://github.com/dask/distributed --upgrade --no-deps COPY ./setup-slave.sh / COPY ./*.sh / diff --git a/ci/slurm.sh b/ci/slurm.sh index 8f12aadf..065bfa02 100644 --- a/ci/slurm.sh +++ b/ci/slurm.sh @@ -18,7 +18,7 @@ function jobqueue_install { } function jobqueue_script { - docker exec -it slurmctld /bin/bash -c "cd /dask-jobqueue; pytest dask_jobqueue --verbose -E slurm" + docker exec -it slurmctld /bin/bash -c "pytest /dask-jobqueue/dask_jobqueue --verbose -E slurm -s" } function jobqueue_after_script { diff --git a/ci/slurm/Dockerfile b/ci/slurm/Dockerfile index 814cf792..6c6c2439 100644 --- a/ci/slurm/Dockerfile +++ b/ci/slurm/Dockerfile @@ -5,7 +5,8 @@ RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-L /opt/anaconda/bin/conda clean -tipy && \ rm -f miniconda.sh ENV PATH /opt/anaconda/bin:$PATH -RUN conda install --yes -c conda-forge python=3.6 dask distributed flake8 pytest docrep +RUN conda install --yes -c conda-forge python=3.6 dask distributed flake8 pytest pytest-asyncio +RUN pip install git+https://github.com/dask/distributed --upgrade --no-deps ENV LC_ALL en_US.UTF-8 diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 5f6e6e1c..888e8e9d 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import logging import math import os @@ -5,116 +6,80 @@ import shlex import subprocess import sys -from collections import OrderedDict -from contextlib import contextmanager +import weakref +import abc import dask -import docrep -from .deploy import ClusterManager -from distributed import LocalCluster -from distributed.diagnostics.plugin import SchedulerPlugin +from dask.utils import ignoring +from distributed.deploy.spec import ProcessInterface, SpecCluster +from distributed.scheduler import Scheduler + from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface logger = logging.getLogger(__name__) -docstrings = docrep.DocstringProcessor() - - -def _job_id_from_worker_name(name): - """ utility to parse the job ID from the worker name - - template: 'prefix--jobid--suffix' - """ - _, job_id, _ = name.split("--") - return job_id - - -class JobQueuePlugin(SchedulerPlugin): - def __init__(self): - self.pending_jobs = OrderedDict() - self.running_jobs = OrderedDict() - self.finished_jobs = OrderedDict() - self.all_workers = {} - - def add_worker(self, scheduler, worker=None, name=None, **kwargs): - """ Run when a new worker enters the cluster""" - logger.debug("adding worker %s", worker) - w = scheduler.workers[worker] - job_id = _job_id_from_worker_name(w.name) - logger.debug("job id for new worker: %s", job_id) - self.all_workers[worker] = (w.name, job_id) - - # if this is the first worker for this job, move job to running - if job_id not in self.running_jobs: - logger.debug("%s is a new job or restarting worker", job_id) - if job_id in self.pending_jobs: - logger.debug("%s is a new job, adding to running_jobs", job_id) - self.running_jobs[job_id] = self.pending_jobs.pop(job_id) - elif job_id in self.finished_jobs: - logger.warning( - "Worker %s restart in Job %s. " "This can be due to memory issue.", - w, - job_id, - ) - self.running_jobs[job_id] = self.finished_jobs.pop(job_id) - else: - logger.error("Unknown job_id: %s for worker %s", job_id, w) - self.running_jobs[job_id] = {} - - # add worker to dict of workers in this job - self.running_jobs[job_id][w.name] = w - - def remove_worker(self, scheduler=None, worker=None, **kwargs): - """ Run when a worker leaves the cluster""" - logger.debug("removing worker %s", worker) - name, job_id = self.all_workers[worker] - logger.debug("removing worker name (%s) and job_id (%s)", name, job_id) - - # remove worker from this job - self.running_jobs[job_id].pop(name, None) - - # once there are no more workers, move this job to finished_jobs - if not self.running_jobs[job_id]: - logger.debug("that was the last worker for job %s", job_id) - self.finished_jobs[job_id] = self.running_jobs.pop(job_id) - - -@docstrings.get_sectionsf("JobQueueCluster") -class JobQueueCluster(ClusterManager): - """ Base class to launch Dask Clusters for Job queues - This class should not be used directly, use inherited class appropriate for your queueing system (e.g. PBScluster - or SLURMCluster) - - Parameters - ---------- - name : str - Name of Dask workers. +job_parameters = """ cores : int Total number of cores per job memory: str Total amount of memory per job processes : int - Number of processes per job + Cut the job up into this many processes. + Good for GIL workloads or for nodes with many cores. interface : str Network interface like 'eth0' or 'ib0'. - death_timeout : float - Seconds to wait for a scheduler before closing workers + nanny : bool + Whether or not to start a nanny process local_directory : str Dask worker local directory for file spilling. + death_timeout : float + Seconds to wait for a scheduler before closing workers extra : list Additional arguments to pass to `dask-worker` env_extra : list Other commands to add to script before launching worker. + header_skip : list + Lines to skip in the header. + Header lines matching this text will be removed log_directory : str Directory to use for job scheduler logs. shebang : str Path to desired interpreter for your batch submission script. python : str Python executable used to launch Dask workers. + Defaults to the Python that is submitting these jobs config_name : str Section to use from jobqueue.yaml configuration file. - kwargs : dict - Additional keyword arguments to pass to `LocalCluster` + name : str + Name of Dask worker. This is typically set by the Cluster +""".strip() + + +cluster_parameters = """ + n_workers : int + Number of workers to start by default. Defaults to 0. + See the scale method + silence_logs : str + Log level like "debug", "info", or "error" to emit here if the + scheduler is started locally + asynchronous : bool + Whether or not to run this cluster object with the async/await syntax + security : Security + A dask.distributed security object if you're using TLS/SSL + dashboard_address : str or int + An address like ":8787" on which to host the Scheduler's dashboard +""".strip() + + +class Job(ProcessInterface, abc.ABC): + """ Base class to launch Dask workers on Job queues + + This class should not be used directly, use a class appropriate for + your queueing system (e.g. PBScluster or SLURMCluster) instead. + + Parameters + ---------- + {job_parameters} Attributes ---------- @@ -133,53 +98,64 @@ class JobQueueCluster(ClusterManager): OARCluster LSFCluster MoabCluster - """ + """.format( + job_parameters=job_parameters + ) _script_template = """ %(shebang)s %(job_header)s - %(env_header)s - %(worker_command)s """.lstrip() # Following class attributes should be overridden by extending classes. submit_command = None cancel_command = None + config_name = None job_id_regexp = r"(?P\d+)" + @abc.abstractmethod def __init__( self, + scheduler=None, name=None, cores=None, memory=None, processes=None, + nanny=True, interface=None, death_timeout=None, local_directory=None, extra=None, env_extra=None, + header_skip=None, log_directory=None, shebang=None, python=sys.executable, + job_name=None, config_name=None, **kwargs ): - """ """ - # """ - # This initializer should be considered as Abstract, and never used directly. - # """ + self.scheduler = scheduler + self.job_id = None + super().__init__() if config_name is None: - raise NotImplementedError( - "JobQueueCluster is an abstract class that should not be instantiated." + config_name = getattr(type(self), "config_name") + if config_name is None: + raise ValueError( + "Looks like you are trying to create a class that inherits from dask_jobqueue.core.Job. " + "If that is the case, you need to:\n" + "- set the 'config_name' class variable to a non-None value\n" + "- create a section in jobqueue.yaml with the value of 'config_name'\n" + "If that is not the case, please open an issue in https://github.com/dask/dask-jobqueue/issues." ) - if name is None: - name = dask.config.get("jobqueue.%s.name" % config_name) + if job_name is None: + job_name = dask.config.get("jobqueue.%s.name" % config_name) if cores is None: cores = dask.config.get("jobqueue.%s.cores" % config_name) if memory is None: @@ -198,6 +174,8 @@ def __init__( extra = dask.config.get("jobqueue.%s.extra" % config_name) if env_extra is None: env_extra = dask.config.get("jobqueue.%s.env-extra" % config_name) + if header_skip is None: + header_skip = dask.config.get("jobqueue.%s.header-skip" % config_name, ()) if log_directory is None: log_directory = dask.config.get("jobqueue.%s.log-directory" % config_name) if shebang is None: @@ -215,42 +193,35 @@ def __init__( self.job_header = None if interface: - extra += ["--interface", interface] + extra = extra + ["--interface", interface] kwargs.setdefault("host", get_ip_interface(interface)) else: kwargs.setdefault("host", "") - # Bokeh diagnostics server should listen on all interfaces - kwargs.setdefault("dashboard_address", ("", 8787)) - self.local_cluster = LocalCluster(n_workers=0, **kwargs) - # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(memory) if memory is not None else None self.worker_processes = processes self.worker_cores = cores self.name = name - - # plugin for tracking job status - self._scheduler_plugin = JobQueuePlugin() - self.local_cluster.scheduler.add_plugin(self._scheduler_plugin) - - self._adaptive = None + self.job_name = job_name self.shebang = shebang - self._env_header = "\n".join(env_extra) + self._env_header = "\n".join(filter(None, env_extra)) + self.header_skip = set(header_skip) # dask-worker command line build dask_worker_command = "%(python)s -m distributed.cli.dask_worker" % dict( python=python ) - command_args = [dask_worker_command, self.scheduler.address] + command_args = [dask_worker_command, self.scheduler] command_args += ["--nthreads", self.worker_process_threads] if processes is not None and processes > 1: command_args += ["--nprocs", processes] command_args += ["--memory-limit", self.worker_process_memory] - command_args += ["--name", "%s--${JOB_ID}--" % name] + command_args += ["--name", str(name)] + command_args += ["--nanny" if nanny else "--no-nanny"] if death_timeout is not None: command_args += ["--death-timeout", death_timeout] @@ -266,69 +237,18 @@ def __init__( if not os.path.exists(self.log_directory): os.makedirs(self.log_directory) - def __repr__(self): - running_workers = self._count_active_workers() - running_cores = running_workers * self.worker_process_threads - total_jobs = len(self.pending_jobs) + len(self.running_jobs) - total_workers = total_jobs * self.worker_processes - running_memory = running_workers * self.worker_memory / self.worker_processes - - return ( - self.__class__.__name__ - + "(cores=%d, memory=%s, workers=%d/%d, jobs=%d/%d)" - % ( - running_cores, - format_bytes(running_memory), - running_workers, - total_workers, - len(self.running_jobs), - total_jobs, - ) - ) - - @property - def pending_jobs(self): - """ Jobs pending in the queue """ - return self._scheduler_plugin.pending_jobs - - @property - def running_jobs(self): - """ Jobs with currently active workers """ - return self._scheduler_plugin.running_jobs - - @property - def finished_jobs(self): - """ Jobs that have finished """ - return self._scheduler_plugin.finished_jobs - - @property - def worker_process_threads(self): - return int(self.worker_cores / self.worker_processes) - - @property - def worker_process_memory(self): - mem = format_bytes(self.worker_memory / self.worker_processes) - mem = mem.replace(" ", "") - return mem - - @property - def jobqueue_worker_spec(self): - """ single worker process info needed for scaling on cores or memory """ - return { - "cores": self.worker_process_threads, - "memory": self.worker_process_memory, - } - - @property - def workers(self): - """ workers currently connected to the scheduler """ - return self.scheduler.workers - def job_script(self): """ Construct a job submission script """ + header = "\n".join( + [ + line + for line in self.job_header.split("\n") + if not any(skip in line for skip in self.header_skip) + ] + ) pieces = { "shebang": self.shebang, - "job_header": self.job_header, + "job_header": header, "env_header": self._env_header, "worker_command": self._command_template, } @@ -343,28 +263,67 @@ def job_file(self): f.write(self.job_script()) yield fn - def _submit_job(self, script_filename): + async def _submit_job(self, script_filename): + # Should we make this async friendly? return self._call(shlex.split(self.submit_command) + [script_filename]) - def start_workers(self, n=1): - """ Start workers and point them to our local scheduler """ - logger.debug("starting %s workers", n) - num_jobs = int(math.ceil(n / self.worker_processes)) - for _ in range(num_jobs): - with self.job_file() as fn: - out = self._submit_job(fn) - job = self._job_id_from_submit_output(out) - if not job: - raise ValueError("Unable to parse jobid from output of %s" % out) - logger.debug("started job: %s", job) - self.pending_jobs[job] = {} + @property + def worker_process_threads(self): + return int(self.worker_cores / self.worker_processes) @property - def scheduler(self): - """ The scheduler of this cluster """ - return self.local_cluster.scheduler + def worker_process_memory(self): + mem = format_bytes(self.worker_memory / self.worker_processes) + mem = mem.replace(" ", "") + return mem + + async def start(self): + """ Start workers and point them to our local scheduler """ + logger.debug("Starting worker: %s", self.name) + + with self.job_file() as fn: + out = await self._submit_job(fn) + self.job_id = self._job_id_from_submit_output(out) - def _call(self, cmd, **kwargs): + weakref.finalize(self, self._close_job, self.job_id) + + logger.debug("Starting job: %s", self.job_id) + await super().start() + + def _job_id_from_submit_output(self, out): + match = re.search(self.job_id_regexp, out) + if match is None: + msg = ( + "Could not parse job id from submission command " + "output.\nJob id regexp is {!r}\nSubmission command " + "output is:\n{}".format(self.job_id_regexp, out) + ) + raise ValueError(msg) + + job_id = match.groupdict().get("job_id") + if job_id is None: + msg = ( + "You need to use a 'job_id' named group in your regexp, e.g. " + "r'(?P\\d+)', in your regexp. Your regexp was: " + "{!r}".format(self.job_id_regexp) + ) + raise ValueError(msg) + + return job_id + + async def close(self): + logger.debug("Stopping worker: %s job: %s", self.name, self.job_id) + self._close_job(self.job_id) + + @classmethod + def _close_job(cls, job_id): + if job_id: + with ignoring(RuntimeError): # deleting job when job already gone + cls._call(shlex.split(cls.cancel_command) + [job_id]) + logger.debug("Closed job %s", job_id) + + @staticmethod + def _call(cmd, **kwargs): """ Call a command using subprocess.Popen. This centralizes calls out to the command line, providing consistent @@ -399,6 +358,7 @@ def _call(self, cmd, **kwargs): out, err = proc.communicate() out, err = out.decode(), err.decode() + if proc.returncode != 0: raise RuntimeError( "Command exited with non-zero exit code.\n" @@ -409,146 +369,137 @@ def _call(self, cmd, **kwargs): ) return out - def stop_workers(self, workers): - """ Stop a list of workers""" - logger.debug("Stopping workers: %s", workers) - if not workers: - return - jobs = self._del_pending_jobs() # stop pending jobs too - for w in workers: - if isinstance(w, dict): - jobs.append(_job_id_from_worker_name(w["name"])) - else: - jobs.append(_job_id_from_worker_name(w.name)) - self.stop_jobs(jobs) - - def stop_jobs(self, jobs): - """ Stop a list of jobs""" - logger.debug("Stopping jobs: %s", jobs) - if jobs: - jobs = list(jobs) - self._call(shlex.split(self.cancel_command) + list(set(jobs))) - - # if any of these jobs were pending, we should remove those now - for job_id in jobs: - if job_id in self.pending_jobs: - del self.pending_jobs[job_id] - - def scale_up(self, n, **kwargs): - """ Brings total worker count up to ``n`` """ - active_and_pending = self._count_active_and_pending_workers() - if n >= active_and_pending: - logger.debug("Scaling up to %d workers.", n) - self.start_workers(n - active_and_pending) - else: - # scale_up should not be called if n < active + pending jobs - logger.warning( - "JobQueueCluster.scale_up was called with a" - " number of workers lower that what is already" - " running or pending" - ) - def _count_active_and_pending_workers(self): - active_and_pending = ( - self._count_active_workers() + self._count_pending_workers() - ) - logger.debug("Found %d active/pending workers.", active_and_pending) - assert len(self.scheduler.workers) <= active_and_pending - return active_and_pending - - def _count_active_workers(self): - active_workers = sum([len(j) for j in self.running_jobs.values()]) - assert len(self.scheduler.workers) == active_workers - return active_workers - - def _count_pending_workers(self): - return self.worker_processes * len(self.pending_jobs) - - def scale_down(self, workers, n=None): - """ Close the workers with the given addresses """ - if n is None: - # Adaptive currently calls directly scale_down, we need to handle this - # Need to only keep active workers minus those adaptive wants to stop - n = self._count_active_workers() - len(workers) - logger.debug("Scaling down to %d Workers: %s", n, workers) - active_and_pending = self._count_active_and_pending_workers() - n_to_close = active_and_pending - n - if n_to_close < 0: - logger.warning( - "JobQueueCluster.scale_down was called with" - " a number of worker greater than what is" - " already running or pending." - ) - elif n_to_close <= self._count_pending_workers(): - # We only need to kill some pending jobs, - to_kill = int(n_to_close / self.worker_processes) - jobs = list(self.pending_jobs.keys())[-to_kill:] - logger.debug("%d jobs to stop, stopping jobs %s", to_kill, jobs) - self.stop_jobs(jobs) - else: - worker_states = [] - for w in workers: - try: - # Get the actual WorkerState - worker_states.append(self.scheduler.workers[w]) - except KeyError: - logger.debug("worker %s is already gone", w) - self.stop_workers(worker_states) - - def stop_all_jobs(self): - """ Stops all running and pending jobs """ - jobs = self._del_pending_jobs() - jobs += list(self.running_jobs.keys()) - self.stop_jobs(set(jobs)) - - def close(self, **kwargs): - """ Stops all running and pending jobs and stops scheduler """ - self.stop_all_jobs() - return self.local_cluster.close(**kwargs) - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() - self.local_cluster.__exit__(type, value, traceback) - - def _del_pending_jobs(self): - jobs = list(self.pending_jobs.keys()) - logger.debug("Deleting pending jobs %s" % jobs) - for job_id in jobs: - del self.pending_jobs[job_id] - return jobs +class JobQueueCluster(SpecCluster): + __doc__ = """ Deploy Dask on a Job queuing system - def _job_id_from_submit_output(self, out): - match = re.search(self.job_id_regexp, out) - if match is None: - msg = ( - "Could not parse job id from submission command " - "output.\nJob id regexp is {!r}\nSubmission command " - "output is:\n{}".format(self.job_id_regexp, out) - ) - raise ValueError(msg) + This is a superclass, and is rarely used directly. It is more common to + use an object like SGECluster, SLURMCluster, PBSCluster, LSFCluster, or + others. - job_id = match.groupdict().get("job_id") - if job_id is None: - msg = ( - "You need to use a 'job_id' named group in your regexp, e.g. " - "r'(?P\\d+)', in your regexp. Your regexp was: " - "{!r}".format(self.job_id_regexp) + However, it can be used directly if you have a custom ``Job`` type. + This class relies heavily on being passed a ``Job`` type that is able to + launch one Job on a job queueing system. + + Parameters + ---------- + Job : Job + A class that can be awaited to ask for a single Job + {cluster_parameters} + """.format( + cluster_parameters=cluster_parameters + ) + + job_cls = None + + def __init__( + self, + n_workers=0, + job_cls: Job = None, + # Cluster keywords + loop=None, + security=None, + silence_logs="error", + name=None, + asynchronous=False, + # Scheduler keywords + interface=None, + host=None, + protocol="tcp://", + dashboard_address=":8787", + config_name=None, + # Job keywords + **kwargs + ): + self.status = "created" + if job_cls is not None: + self.job_cls = job_cls + + if self.job_cls is None: + raise ValueError( + "You must provide a Job type like PBSJob, SLURMJob, " + "or SGEJob with the job_cls= argument." ) - raise ValueError(msg) - return job_id + if config_name: + if interface is None: + interface = dask.config.get("jobqueue.%s.interface" % config_name) + + scheduler = { + "cls": Scheduler, # Use local scheduler for now + "options": { + "protocol": protocol, + "interface": interface, + "host": host, + "dashboard_address": dashboard_address, + "security": security, + }, + } + if config_name: + kwargs["config_name"] = config_name + kwargs["interface"] = interface + kwargs["protocol"] = protocol + kwargs["security"] = security + self._kwargs = kwargs + worker = {"cls": self.job_cls, "options": kwargs} + if "processes" in kwargs and kwargs["processes"] > 1: + worker["group"] = ["-" + str(i) for i in range(kwargs["processes"])] + + self._dummy_job # trigger property to ensure that the job is valid + + super().__init__( + scheduler=scheduler, + worker=worker, + loop=loop, + silence_logs=silence_logs, + asynchronous=asynchronous, + name=name, + ) - @staticmethod - def worker_key(worker_state): - return _job_id_from_worker_name(worker_state.name) + if n_workers: + self.scale(n_workers) + + @property + def _dummy_job(self): + """ + Creates a Job similar to what we will use in practice + + This is used for backwards functionality and a variety of convenience + functions. It is also used on construction to raise errors if any of + the keywords are improper. + """ + try: + address = self.scheduler.address # Have we already connected? + except AttributeError: + address = "tcp://:8786" + return self.job_cls( + address or "tcp://:8786", + name="name", + **self._kwargs + ) @property - def scheduler_comm(self): - return self.local_cluster.scheduler_comm + def job_header(self): + return self._dummy_job.job_header + + def job_script(self): + return self._dummy_job.job_script() @property - def scheduler_info(self): - return self.local_cluster.scheduler_info + def job_name(self): + return self._dummy_job.job_name + + def scale(self, n=None, jobs=0, memory=None, cores=None): + if n is not None: + jobs = int(math.ceil(n / self._dummy_job.worker_processes)) + + return super().scale(jobs, memory=memory, cores=cores) + + def adapt( + self, *args, minimum_jobs: int = None, maximum_jobs: int = None, **kwargs + ): + if minimum_jobs is not None: + kwargs["minimum"] = minimum_jobs * self._dummy_job.worker_processes + if maximum_jobs is not None: + kwargs["maximum"] = maximum_jobs * self._dummy_job.worker_processes + return super().adapt(*args, **kwargs) diff --git a/dask_jobqueue/deploy/__init__.py b/dask_jobqueue/deploy/__init__.py deleted file mode 100644 index c9e11c0f..00000000 --- a/dask_jobqueue/deploy/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# flake8: noqa -from .cluster_manager import ClusterManager diff --git a/dask_jobqueue/deploy/cluster_manager.py b/dask_jobqueue/deploy/cluster_manager.py deleted file mode 100644 index 6910c82f..00000000 --- a/dask_jobqueue/deploy/cluster_manager.py +++ /dev/null @@ -1,392 +0,0 @@ -import logging -import math - -from tornado import gen - -from distributed.deploy.adaptive import Adaptive -from distributed.utils import ( - log_errors, - ignoring, - parse_bytes, - PeriodicCallback, - format_bytes, - format_dashboard_link, -) - -logger = logging.getLogger(__name__) - - -class ClusterManager: - """ Intermediate Cluster object that should lead to a real ClusterManager - - This tries to improve upstream Cluster object and underlines needs for - better decoupling between ClusterManager and Scheduler object - - This currently expects a local Scheduler defined on the object, but should - eventually only rely on RPC calls on remote or local scheduler. - It provides common methods and an IPython widget display. - - Clusters inheriting from this class should provide the following: - - 1. A local ``Scheduler`` object at ``.scheduler``. In the future, just - a URL to local or remote scheduler. - 2. scale_up and scale_down methods as defined below:: - - def scale_up(self, n: int): - ''' Brings total worker count up to ``n`` ''' - - def scale_down(self, workers: List[str], n: int): - ''' Close the workers with the given addresses or remove pending - workers to match n running workers. - ''' - 3. Optionally worker_key: Callable(WorkerState): - ''' Callable mapping a WorkerState object to a group, see - Scheduler.workers_to_close - ''' - 4. jobqueue_worker_spec dict attribute if scale(cores=...) or scale(memory=...) - can be used by users. - jobqueue_worker_spec = {'cores': 4, 'memory': '16 GB'} - - This will provide a general ``scale`` method as well as an IPython widget - for display. - - Things the will need to change for the complete Cluster Manager Design: - - ClusterManager: - - Use it's own event loop, or the notebook one. - - Connect to a local or remote Scheduler through RPC, and then - communicate with it. - - Ability to start a local or remote scheduler. - - Ability to work with different worker pools: in scale, adaptive, - jobqueue_worker_spec... - - Scheduler - - Provide some remote methods: - - retire_workers(n: int): close enough workers ot have only n - running at the end. Return the closed workers. - - status of connected worker, e.g. scheduler_info() - - Examples - -------- - - >>> from distributed.deploy import Cluster - >>> class MyCluster(cluster): - ... def scale_up(self, n): - ... ''' Bring the total worker count up to n ''' - ... pass - ... def scale_down(self, workers, n=None): - ... ''' Close the workers with the given addresses ''' - ... pass - - >>> cluster = MyCluster() - >>> cluster.scale(5) # scale manually - >>> cluster.adapt(minimum=1, maximum=100) # scale automatically - >>> cluster.scale(cores=100) # scale manually to cores nb - """ - - def __init__(self, adaptive_options={}): - self._target_scale = 0 - self._adaptive_options = adaptive_options - self._adaptive_options.setdefault("worker_key", self.worker_key) - - def adapt( - self, - minimum_cores=None, - maximum_cores=None, - minimum_memory=None, - maximum_memory=None, - **kwargs - ): - """ Turn on adaptivity - For keyword arguments see dask.distributed.Adaptive - Instead of minimum and maximum parameters which apply to the number of - worker, If Cluster object implements jobqueue_worker_spec attribute, one can - use the following parameters: - Parameters - ---------- - minimum_cores: int - Minimum number of cores for the cluster - maximum_cores: int - Maximum number of cores for the cluster - minimum_memory: str - Minimum amount of memory for the cluster - maximum_memory: str - Maximum amount of memory for the cluster - Examples - -------- - >>> cluster.adapt(minimum=0, maximum=10, interval='500ms') - >>> cluster.adapt(minimum_cores=24, maximum_cores=96) - >>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB') - """ - with ignoring(AttributeError): - self._adaptive.stop() - if not hasattr(self, "_adaptive_options"): - self._adaptive_options = {} - if "minimum" not in kwargs: - if minimum_cores is not None: - kwargs["minimum"] = self._get_nb_workers_from_cores(minimum_cores) - elif minimum_memory is not None: - kwargs["minimum"] = self._get_nb_workers_from_memory(minimum_memory) - if "maximum" not in kwargs: - if maximum_cores is not None: - kwargs["maximum"] = self._get_nb_workers_from_cores(maximum_cores) - elif maximum_memory is not None: - kwargs["maximum"] = self._get_nb_workers_from_memory(maximum_memory) - self._adaptive_options.update(kwargs) - try: - self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options) - except Exception: - self._adaptive = Adaptive(self, **self._adaptive_options) - return self._adaptive - - @property - def scheduler_address(self): - return self.scheduler.address - - @property - def dashboard_link(self): - host = self.scheduler.address.split("://")[1].split(":")[0] - port = self.scheduler.services["dashboard"].port - return format_dashboard_link(host, port) - - @gen.coroutine - def _scale(self, n=None, cores=None, memory=None): - """ Asynchronously called scale method - - This allows to do every operation with a coherent context - """ - with log_errors(): - if [n, cores, memory].count(None) != 2: - raise ValueError( - "One and only one of n, cores, memory kwargs" - " should be used, n={}, cores={}, memory={}" - " provided.".format(n, cores, memory) - ) - if n is None: - if cores is not None: - n = self._get_nb_workers_from_cores(cores) - elif memory is not None: - n = self._get_nb_workers_from_memory(memory) - - # here we rely on a ClusterManager attribute to retrieve the - # active and pending workers - if n == self._target_scale: - pass - elif n > self._target_scale: - self.scale_up(n) - else: - # TODO to_close may be empty if some workers are pending - # This may not be useful to call scheduler methods in this case - # Scheduler interface here may need to be modified - to_close = self.scheduler.workers_to_close( - n=len(self.scheduler.workers) - n, minimum=n, key=self.worker_key - ) - logger.debug("Closing workers: %s", to_close) - # Should be an RPC call here - yield self.scheduler.retire_workers(workers=to_close) - # To close may be empty if just asking to remove pending - # workers, so we should also give a target number - self.scale_down(to_close, n) - self._target_scale = n - - def scale(self, n=None, cores=None, memory=None): - """ Scale cluster to n workers or to the given number of cores or - memory - number of cores and memory are converted into number of workers using - jobqueue_worker_spec attribute. - Parameters - ---------- - n: int - Target number of workers - cores: int - Target number of cores - memory: str - Target amount of available memory - Example - ------- - >>> cluster.scale(10) # scale cluster to ten workers - >>> cluster.scale(cores=100) # scale cluster to 100 cores - >>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory - See Also - -------- - Cluster.scale_up - Cluster.scale_down - Cluster.jobqueue_worker_spec - """ - # TODO we should not rely on scheduler loop here, self should have its - # own loop - self.scheduler.loop.add_callback(self._scale, n, cores, memory) - - def _widget_status(self): - workers = len(self.scheduler.workers) - cores = sum(ws.nthreads for ws in self.scheduler.workers.values()) - memory = sum(ws.memory_limit for ws in self.scheduler.workers.values()) - memory = format_bytes(memory) - text = """ -
- - - - - -
Workers %d
Cores %d
Memory %s
-
-""" % ( - workers, - cores, - memory, - ) - return text - - def _widget(self): - """ Create IPython widget for display within a notebook """ - try: - return self._cached_widget - except AttributeError: - pass - - from ipywidgets import ( - Layout, - VBox, - HBox, - IntText, - Button, - HTML, - Accordion, - Text, - ) - - layout = Layout(width="150px") - - if "dashboard" in self.scheduler.services: - link = self.dashboard_link - link = '

Dashboard: %s

\n' % ( - link, - link, - ) - else: - link = "" - - title = "

%s

" % type(self).__name__ - title = HTML(title) - dashboard = HTML(link) - - status = HTML(self._widget_status(), layout=Layout(min_width="150px")) - - request = IntText(0, description="Workers", layout=layout) - scale = Button(description="Scale", layout=layout) - request_cores = IntText(0, description="Cores", layout=layout) - scale_cores = Button(description="Scale", layout=layout) - request_memory = Text("O GB", description="Memory", layout=layout) - scale_memory = Button(description="Scale", layout=layout) - - minimum = IntText(0, description="Minimum", layout=layout) - maximum = IntText(0, description="Maximum", layout=layout) - adapt = Button(description="Adapt", layout=layout) - minimum_cores = IntText(0, description="Min cores", layout=layout) - maximum_cores = IntText(0, description="Max cores", layout=layout) - adapt_cores = Button(description="Adapt", layout=layout) - minimum_mem = Text("0 GB", description="Min memory", layout=layout) - maximum_mem = Text("0 GB", description="Max memory", layout=layout) - adapt_mem = Button(description="Adapt", layout=layout) - - scale_hbox = [HBox([request, scale])] - adapt_hbox = [HBox([minimum, maximum, adapt])] - if hasattr(self, "jobqueue_worker_spec"): - scale_hbox.append(HBox([request_cores, scale_cores])) - scale_hbox.append(HBox([request_memory, scale_memory])) - adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores])) - adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem])) - - accordion = Accordion( - [VBox(scale_hbox), VBox(adapt_hbox)], layout=Layout(min_width="500px") - ) - accordion.selected_index = None - accordion.set_title(0, "Manual Scaling") - accordion.set_title(1, "Adaptive Scaling") - - box = VBox([title, HBox([status, accordion]), dashboard]) - - self._cached_widget = box - - def adapt_cb(b): - self.adapt(minimum=minimum.value, maximum=maximum.value) - - def adapt_cores_cb(b): - self.adapt( - minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value - ) - - def adapt_mem_cb(b): - self.adapt( - minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value - ) - - adapt.on_click(adapt_cb) - adapt_cores.on_click(adapt_cores_cb) - adapt_mem.on_click(adapt_mem_cb) - - def scale_cb(request, kwarg): - def request_cb(b): - with log_errors(): - arg = request.value - with ignoring(AttributeError): - self._adaptive.stop() - local_kwargs = dict() - local_kwargs[kwarg] = arg - self.scale(**local_kwargs) - - return request_cb - - scale.on_click(scale_cb(request, "n")) - scale_cores.on_click(scale_cb(request_cores, "cores")) - scale_memory.on_click(scale_cb(request_memory, "memory")) - - def update(): - status.value = self._widget_status() - - pc = PeriodicCallback(update, 500, io_loop=self.scheduler.loop) - self.scheduler.periodic_callbacks["cluster-repr"] = pc - pc.start() - - return box - - def _ipython_display_(self, **kwargs): - return self._widget()._ipython_display_(**kwargs) - - def worker_key(self, worker_state): - """ Callable mapping a WorkerState object to a group, see - Scheduler.workers_to_close - """ - return worker_state - - def _get_nb_workers_from_cores(self, cores): - return math.ceil(cores / self.jobqueue_worker_spec["cores"]) - - def _get_nb_workers_from_memory(self, memory): - return math.ceil( - parse_bytes(memory) / parse_bytes(self.jobqueue_worker_spec["memory"]) - ) - - @property - def jobqueue_worker_spec(self): - """ single worker process info needed for scaling on cores or memory """ - raise NotImplementedError( - "{} class does not provide jobqueue_worker_spec " - "attribute, needed for scaling with " - "cores or memory kwargs.".format(self.__class__.__name__) - ) - - @property - def loop(self): - return self.scheduler.loop diff --git a/dask_jobqueue/htcondor.py b/dask_jobqueue/htcondor.py index 7b1efcba..6b3769ed 100644 --- a/dask_jobqueue/htcondor.py +++ b/dask_jobqueue/htcondor.py @@ -5,40 +5,12 @@ import dask from distributed.utils import parse_bytes -from .core import JobQueueCluster, docstrings +from .core import JobQueueCluster, Job, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class HTCondorCluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on an HTCondor cluster with a shared file system - - Parameters - ---------- - disk : str - Total amount of disk per job - job_extra : dict - Extra submit file attributes for the job - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue.htcondor import HTCondorCluster - >>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB") - >>> cluster.scale(10) - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. - HTCondor can take longer to start jobs than other batch systems - tune Adaptive parameters accordingly. - - >>> cluster.adapt(minimum=5, startup_cost='60s') - """, - 4, - ) - +class HTCondorJob(Job): _script_template = """ %(shebang)s @@ -57,7 +29,9 @@ class HTCondorCluster(JobQueueCluster): # Python (can't find its libs), so we have to go through the shell. executable = "/bin/sh" - def __init__(self, disk=None, job_extra=None, config_name="htcondor", **kwargs): + def __init__( + self, *args, disk=None, job_extra=None, config_name="htcondor", **kwargs + ): if disk is None: disk = dask.config.get("jobqueue.%s.disk" % config_name) if disk is None: @@ -71,7 +45,7 @@ def __init__(self, disk=None, job_extra=None, config_name="htcondor", **kwargs): self.job_extra = job_extra # Instantiate args and parameters from parent abstract class - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) env_extra = kwargs.get("env_extra", None) if env_extra is None: @@ -220,3 +194,34 @@ def quote_environment(env): entries.append("%s=%s" % (k, qv)) return " ".join(entries) + + +class HTCondorCluster(JobQueueCluster): + __doc__ = """ Launch Dask on an HTCondor cluster with a shared file system + + Parameters + ---------- + disk : str + Total amount of disk per job + job_extra : dict + Extra submit file attributes for the job + {job} + {cluster} + + Examples + -------- + >>> from dask_jobqueue.htcondor import HTCondorCluster + >>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB") + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = HTCondorJob + config_name = "htcondor" diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index eb1d3b49..d2174258 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -161,3 +161,18 @@ jobqueue: log-directory: null shebang: "#!/usr/bin/env condor_submit" + local: + name: dask-worker + # Dask worker options + cores: null # Total number of cores per job + memory: null # Total amount of memory per job + processes: 1 # Number of Python processes per job + + interface: null # Network interface to use like eth0 or ib0 + death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler + local-directory: null # Location of fast local storage like /scratch or $TMPDIR + + extra: [] + env-extra: [] + job-extra: [] + log-directory: null diff --git a/dask_jobqueue/local.py b/dask_jobqueue/local.py new file mode 100644 index 00000000..3ac2fd12 --- /dev/null +++ b/dask_jobqueue/local.py @@ -0,0 +1,100 @@ +import logging +import os +from tornado.process import Subprocess + +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters + +logger = logging.getLogger(__name__) + + +class LocalJob(Job): + __doc__ = """ Use Dask Jobqueue with local bash commands + + This is mostly for testing. It uses all the same machinery of + dask-jobqueue, but rather than submitting jobs to some external job + queueing system, it launches them locally. For normal local use, please + see ``dask.distributed.LocalCluster`` + + Parameters + ---------- + {job} + """.format( + job=job_parameters + ) + + config_name = "local" + + def __init__( + self, + *args, + queue=None, + project=None, + resource_spec=None, + walltime=None, + job_extra=None, + config_name="local", + **kwargs + ): + # Instantiate args and parameters from parent abstract class + super().__init__(*args, config_name=config_name, shebang="", **kwargs) + + # Declare class attribute that shall be overridden + self.job_header = "" + + logger.debug("Job script: \n %s" % self.job_script()) + + async def _submit_job(self, script_filename): + # Should we make this async friendly? + with open(script_filename) as f: + text = f.read().strip().split() + self.process = Subprocess( + text, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM + ) + + lines = [] + while True: + line = await self.process.stderr.read_until( + b"\n" + ) # make sure that we start + lines.append(line.decode()) + if b"Registered to:" in line: + break + if b"error" in line.lower(): + raise Exception("Worker failed\n\n" + "".join(lines)) + + return str(self.process.pid) + + @classmethod + def _close_job(self, job_id): + os.kill(int(job_id), 9) + # from distributed.utils_test import terminate_process + # terminate_process(self.process) + + +class LocalCluster(JobQueueCluster): + __doc__ = """ Use dask-jobqueue with local bash commands + + This is mostly for testing. It uses all the same machinery of + dask-jobqueue, but rather than submitting jobs to some external job + queueing system, it launches them locally. For normal local use, please + see ``dask.distributed.LocalCluster`` + + Parameters + ---------- + {job} + {cluster} + + Examples + -------- + >>> from dask_jobqueue import LocalCluster + >>> cluster = LocalCluster(cores=2, memory="4 GB") + >>> cluster.scale(jobs=3) # ask for 3 jobs + + See Also + -------- + dask.distributed.LocalCluster + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = LocalJob + config_name = "local" diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index fd8d20af..348a356d 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -4,60 +4,18 @@ import dask -from .core import JobQueueCluster, docstrings +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class LSFCluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on a LSF cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#BSUB -q` option. - project : str - Accounting string associated with each worker job. Passed to - `#BSUB -P` option. - ncpus : int - Number of cpus. Passed to `#BSUB -n` option. - mem : int - Request memory in bytes. Passed to `#BSUB -M` option. - walltime : str - Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. - job_extra : list - List of other LSF options, for example -u. Each option will be - prepended with the #LSF prefix. - lsf_units : str - Unit system for large units in resource usage set by the - LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue import LSFCluster - >>> cluster = LSFcluster(queue='general', project='DaskonLSF', - ... cores=15, memory='25GB') - >>> cluster.scale(10) # this may take a few seconds to launch - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and - kill workers based on load. - - >>> cluster.adapt() - """, - 4, - ) - - # Override class variables - submit_command = "bsub <" +class LSFJob(Job): + submit_command = "bsub" cancel_command = "bkill" def __init__( self, + *args, queue=None, project=None, ncpus=None, @@ -84,12 +42,12 @@ def __init__( lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name) # Instantiate args and parameters from parent abstract class - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) header_lines = [] # LSF header build if self.name is not None: - header_lines.append("#BSUB -J %s" % self.name) + header_lines.append("#BSUB -J %s" % self.job_name) if self.log_directory is not None: header_lines.append( "#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") @@ -133,10 +91,6 @@ def __init__( logger.debug("Job script: \n %s" % self.job_script()) - def _submit_job(self, script_filename): - piped_cmd = [self.submit_command + " " + script_filename + " 2> /dev/null"] - return self._call(piped_cmd, shell=True) - def lsf_format_bytes_ceil(n, lsf_units="mb"): """ Format bytes as text @@ -196,3 +150,49 @@ def lsf_detect_units(): "default unit of %s." % unit ) return unit + + +class LSFCluster(JobQueueCluster): + __doc__ = """ Launch Dask on a LSF cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#BSUB -q` option. + project : str + Accounting string associated with each worker job. Passed to + `#BSUB -P` option. + {job} + ncpus : int + Number of cpus. Passed to `#BSUB -n` option. + mem : int + Request memory in bytes. Passed to `#BSUB -M` option. + walltime : str + Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. + {cluster} + job_extra : list + List of other LSF options, for example -u. Each option will be + prepended with the #LSF prefix. + lsf_units : str + Unit system for large units in resource usage set by the + LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster. + + Examples + -------- + >>> from dask_jobqueue import LSFCluster + >>> cluster = LSFCluster(queue='general', project='DaskonLSF', + ... cores=15, memory='25GB') + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and + kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = LSFJob + config_name = "lsf" diff --git a/dask_jobqueue/moab.py b/dask_jobqueue/moab.py index 0114ac91..969a8475 100644 --- a/dask_jobqueue/moab.py +++ b/dask_jobqueue/moab.py @@ -1,45 +1,12 @@ -from .core import docstrings -from .pbs import PBSCluster +from .pbs import PBSJob, PBSCluster -class MoabCluster(PBSCluster): - __doc__ = docstrings.with_indents( - """Launch Dask on a Moab cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#PBS -q` option. - project : str - Accounting string associated with each worker job. Passed to - `#PBS -A` option. - resource_spec : str - Request resources and specify job placement. Passed to `#PBS -l` option. - walltime : str - Walltime for each worker job. - job_extra : list - List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> import os - >>> from dask_jobqueue import MoabCluster - >>> cluster = MoabCluster(processes=6, cores=6, project='gfdl_m', - memory='96G', resource_spec='96G', - job_extra=['-d /home/First.Last', '-M none'], - local_directory=os.getenv('TMPDIR', '/tmp')) - >>> cluster.scale(60) # submit enough jobs to deploy 10 workers - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. - - >>> cluster.adapt() - """, - 4, - ) +class MoabJob(PBSJob): submit_command = "msub" cancel_command = "canceljob" scheduler_name = "moab" + + +class MoabCluster(PBSCluster): + __doc__ = PBSCluster.__doc__.replace("PBSCluster", "MoabCluster") + job_cls = MoabJob diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index 39dc7dda..f59f8512 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -3,44 +3,12 @@ import dask -from .core import JobQueueCluster, docstrings +from .core import JobQueueCluster, Job, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class OARCluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on a OAR cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#OAR -q` option. - project : str - Accounting string associated with each worker job. Passed to `#OAR -p` option. - resource_spec : str - Request resources and specify job placement. Passed to `#OAR -l` option. - walltime : str - Walltime for each worker job. - job_extra : list - List of other OAR options, for example `-t besteffort`. Each option will be prepended with the #OAR prefix. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue import OARCluster - >>> cluster = OARCluster(queue='regular') - >>> cluster.scale(10) # this may take a few seconds to launch - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. - - >>> cluster.adapt() - """, - 4, - ) +class OARJob(Job): # Override class variables submit_command = "oarsub" @@ -49,6 +17,7 @@ class OARCluster(JobQueueCluster): def __init__( self, + *args, queue=None, project=None, resource_spec=None, @@ -68,11 +37,11 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name) - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) header_lines = [] - if self.name is not None: - header_lines.append("#OAR -n %s" % self.name) + if self.job_name is not None: + header_lines.append("#OAR -n %s" % self.job_name) if queue is not None: header_lines.append("#OAR -q %s" % queue) if project is not None: @@ -121,3 +90,40 @@ def _submit_job(self, fn): oarsub_command = " ".join([self.submit_command] + oarsub_options) oarsub_command_split = shlex.split(oarsub_command) + [inline_script] return self._call(oarsub_command_split) + + +class OARCluster(JobQueueCluster): + __doc__ = """ Launch Dask on an OAR cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#OAR -q` option. + project : str + Accounting string associated with each worker job. Passed to `#OAR -p` option. + {job} + {cluster} + resource_spec : str + Request resources and specify job placement. Passed to `#OAR -l` option. + walltime : str + Walltime for each worker job. + job_extra : list + List of other OAR options, for example `-t besteffort`. Each option will be prepended with the #OAR prefix. + + Examples + -------- + >>> from dask_jobqueue import OARCluster + >>> cluster = OARCluster(queue='regular') + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = OARJob + config_name = "oar" diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index e2b9529c..f8b3acd1 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -4,59 +4,44 @@ import dask -from .core import JobQueueCluster, docstrings +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class PBSCluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on a PBS cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#PBS -q` option. - project : str - Accounting string associated with each worker job. Passed to - `#PBS -A` option. - resource_spec : str - Request resources and specify job placement. Passed to `#PBS -l` - option. - walltime : str - Walltime for each worker job. - job_extra : list - List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue import PBSCluster - >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', cores=12) - >>> cluster.scale(10) # this may take a few seconds to launch - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. +def pbs_format_bytes_ceil(n): + """ Format bytes as text. - >>> cluster.adapt() + PBS expects KiB, MiB or Gib, but names it KB, MB, GB whereas Dask makes the difference between KB and KiB. - It is a good practice to define local_directory to your PBS system scratch directory: + >>> pbs_format_bytes_ceil(1) + '1B' + >>> pbs_format_bytes_ceil(1234) + '1234B' + >>> pbs_format_bytes_ceil(12345678) + '13MB' + >>> pbs_format_bytes_ceil(1234567890) + '1177MB' + >>> pbs_format_bytes_ceil(15000000000) + '14GB' + """ + if n >= 10 * (1024 ** 3): + return "%dGB" % math.ceil(n / (1024 ** 3)) + if n >= 10 * (1024 ** 2): + return "%dMB" % math.ceil(n / (1024 ** 2)) + if n >= 10 * 1024: + return "%dkB" % math.ceil(n / 1024) + return "%dB" % n - >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', - ... local_directory='$TMPDIR', - ... cores=24, processes=6, memory='100GB') - """, - 4, - ) - # Override class variables +class PBSJob(Job): submit_command = "qsub" cancel_command = "qdel" + config_name = "pbs" def __init__( self, + *args, queue=None, project=None, resource_spec=None, @@ -79,15 +64,15 @@ def __init__( ) or os.environ.get("PBS_ACCOUNT") # Instantiate args and parameters from parent abstract class - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) # Try to find a project name from environment variable project = project or os.environ.get("PBS_ACCOUNT") header_lines = [] # PBS header build - if self.name is not None: - header_lines.append("#PBS -N %s" % self.name) + if self.job_name is not None: + header_lines.append("#PBS -N %s" % self.job_name) if queue is not None: header_lines.append("#PBS -q %s" % queue) if project is not None: @@ -117,26 +102,39 @@ def __init__( logger.debug("Job script: \n %s" % self.job_script()) -def pbs_format_bytes_ceil(n): - """ Format bytes as text. +class PBSCluster(JobQueueCluster): + __doc__ = """ Launch Dask on a PBS cluster - PBS expects KiB, MiB or Gib, but names it KB, MB, GB whereas Dask makes the difference between KB and KiB. + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#PBS -q` option. + project : str + Accounting string associated with each worker job. Passed to `#PBS -A` option. + {job} + {cluster} + resource_spec : str + Request resources and specify job placement. Passed to `#PBS -l` option. + walltime : str + Walltime for each worker job. + job_extra : list + List of other PBS options. Each option will be prepended with the #PBS prefix. - >>> pbs_format_bytes_ceil(1) - '1B' - >>> pbs_format_bytes_ceil(1234) - '1234B' - >>> pbs_format_bytes_ceil(12345678) - '13MB' - >>> pbs_format_bytes_ceil(1234567890) - '1177MB' - >>> pbs_format_bytes_ceil(15000000000) - '14GB' - """ - if n >= 10 * (1024 ** 3): - return "%dGB" % math.ceil(n / (1024 ** 3)) - if n >= 10 * (1024 ** 2): - return "%dMB" % math.ceil(n / (1024 ** 2)) - if n >= 10 * 1024: - return "%dkB" % math.ceil(n / 1024) - return "%dB" % n + Examples + -------- + >>> from dask_jobqueue import PBSCluster + >>> cluster = PBSCluster(queue='regular', project="myproj", cores=24, + ... memory="500 GB") + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = PBSJob + config_name = "pbs" diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 3b7db255..cc022b47 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -2,59 +2,18 @@ import dask -from .core import JobQueueCluster, docstrings +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class SGECluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on a SGE cluster - - .. note:: - If you want a specific amount of RAM, both ``memory`` and ``resource_spec`` - must be specified. The exact syntax of ``resource_spec`` is defined by your - GridEngine system administrator. The amount of ``memory`` requested should - match the ``resource_spec``, so that Dask's memory management system can - perform accurately. - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#$ -q` option. - project : str - Accounting string associated with each worker job. Passed to `#$ -A` option. - resource_spec : str - Request resources and specify job placement. Passed to `#$ -l` option. - walltime : str - Walltime for each worker job. - job_extra : list - List of other SGE options, for example -w e. Each option will be - prepended with the #$ prefix. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue import SGECluster - >>> cluster = SGECluster(queue='regular') - >>> cluster.scale(10) # this may take a few seconds to launch - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. - - >>> cluster.adapt() - """, - 4, - ) - - # Override class variables - submit_command = "qsub -terse" +class SGEJob(Job): + submit_command = "qsub" cancel_command = "qdel" def __init__( self, + *args, queue=None, project=None, resource_spec=None, @@ -74,11 +33,11 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name) - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) header_lines = [] - if self.name is not None: - header_lines.append("#$ -N %(name)s") + if self.job_name is not None: + header_lines.append("#$ -N %(job-name)s") if queue is not None: header_lines.append("#$ -q %(queue)s") if project is not None: @@ -95,7 +54,7 @@ def __init__( header_template = "\n".join(header_lines) config = { - "name": self.name, + "job-name": self.job_name, "queue": queue, "project": project, "processes": self.worker_processes, @@ -106,3 +65,53 @@ def __init__( self.job_header = header_template % config logger.debug("Job script: \n %s" % self.job_script()) + + +class SGECluster(JobQueueCluster): + __doc__ = """ Launch Dask on an SGE cluster + + .. note:: + If you want a specific amount of RAM, both ``memory`` and ``resource_spec`` + must be specified. The exact syntax of ``resource_spec`` is defined by your + GridEngine system administrator. The amount of ``memory`` requested should + match the ``resource_spec``, so that Dask's memory management system can + perform accurately. + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#$ -q` option. + project : str + Accounting string associated with each worker job. Passed to `#$ -A` option. + {job} + {cluster} + resource_spec : str + Request resources and specify job placement. Passed to `#$ -l` option. + walltime : str + Walltime for each worker job. + job_extra : list + List of other SGE options, for example -w e. Each option will be + prepended with the #$ prefix. + + Examples + -------- + >>> from dask_jobqueue import SGECluster + >>> cluster = SGECluster( + ... queue='regular', + ... project="myproj", + ... cores=24, + ... memory="500 GB" + ... ) + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = SGEJob + config_name = "sge" diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 5942c340..e17c85e2 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -3,57 +3,19 @@ import dask -from .core import JobQueueCluster, docstrings +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters logger = logging.getLogger(__name__) -class SLURMCluster(JobQueueCluster): - __doc__ = docstrings.with_indents( - """ Launch Dask on a SLURM cluster - - Parameters - ---------- - queue : str - Destination queue for each worker job. Passed to `#SBATCH -p` option. - project : str - Accounting string associated with each worker job. Passed to `#SBATCH -A` option. - walltime : str - Walltime for each worker job. - job_cpu : int - Number of cpu to book in SLURM, if None, defaults to worker `threads * processes` - job_mem : str - Amount of memory to request in SLURM. If None, defaults to worker - processes * memory - job_extra : list - List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. - %(JobQueueCluster.parameters)s - - Examples - -------- - >>> from dask_jobqueue import SLURMCluster - >>> cluster = SLURMCluster(processes=6, cores=24, memory="120GB", - env_extra=['export LANG="en_US.utf8"', - 'export LANGUAGE="en_US.utf8"', - 'export LC_ALL="en_US.utf8"']) - >>> cluster.scale(10) # this may take a few seconds to launch - - >>> from dask.distributed import Client - >>> client = Client(cluster) - - This also works with adaptive clusters. This automatically launches and kill workers based on load. - - >>> cluster.adapt() - """, - 4, - ) - +class SLURMJob(Job): # Override class variables submit_command = "sbatch" cancel_command = "scancel" def __init__( self, + *args, queue=None, project=None, walltime=None, @@ -76,19 +38,20 @@ def __init__( if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name) - super().__init__(config_name=config_name, **kwargs) + super().__init__(*args, config_name=config_name, **kwargs) - # Always ask for only one task header_lines = [] # SLURM header build - if self.name is not None: - header_lines.append("#SBATCH -J %s" % self.name) + if self.job_name is not None: + header_lines.append("#SBATCH -J %s" % self.job_name) if self.log_directory is not None: header_lines.append( - "#SBATCH -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker") + "#SBATCH -e %s/%s-%%J.err" + % (self.log_directory, self.job_name or "worker") ) header_lines.append( - "#SBATCH -o %s/%s-%%J.out" % (self.log_directory, self.name or "worker") + "#SBATCH -o %s/%s-%%J.out" + % (self.log_directory, self.job_name or "worker") ) if queue is not None: header_lines.append("#SBATCH -p %s" % queue) @@ -112,13 +75,11 @@ def __init__( header_lines.append("#SBATCH -t %s" % walltime) header_lines.extend(["#SBATCH %s" % arg for arg in job_extra]) - header_lines.append("JOB_ID=${SLURM_JOB_ID%;*}") + header_lines.append("\nJOB_ID=${SLURM_JOB_ID%;*}") # Declare class attribute that shall be overridden self.job_header = "\n".join(header_lines) - logger.debug("Job script: \n %s" % self.job_script()) - def slurm_format_bytes_ceil(n): """ Format bytes as text. @@ -143,3 +104,48 @@ def slurm_format_bytes_ceil(n): if n >= 1024: return "%dK" % math.ceil(n / 1024) return "1K" % n + + +class SLURMCluster(JobQueueCluster): + __doc__ = """ Launch Dask on a SLURM cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to `#SBATCH -p` option. + project : str + Accounting string associated with each worker job. Passed to `#SBATCH -A` option. + {job} + {cluster} + walltime : str + Walltime for each worker job. + job_cpu : int + Number of cpu to book in SLURM, if None, defaults to worker `threads * processes` + job_mem : str + Amount of memory to request in SLURM. If None, defaults to worker + processes * memory + job_extra : list + List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix. + + Examples + -------- + >>> from dask_jobqueue import SLURMCluster + >>> cluster = SLURMCluster( + ... queue='regular', + ... project="myproj", + ... cores=24, + ... memory="500 GB" + ... ) + >>> cluster.scale(jobs=10) # ask for 10 jobs + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + This also works with adaptive clusters. This automatically launches and kill workers based on load. + + >>> cluster.adapt(maximum_jobs=20) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = SLURMJob + config_name = "slurm" diff --git a/dask_jobqueue/tests/__init__.py b/dask_jobqueue/tests/__init__.py index 67db44ea..c666d02e 100644 --- a/dask_jobqueue/tests/__init__.py +++ b/dask_jobqueue/tests/__init__.py @@ -1 +1 @@ -QUEUE_WAIT = 15 # seconds +QUEUE_WAIT = 60 # seconds diff --git a/dask_jobqueue/tests/test_htcondor.py b/dask_jobqueue/tests/test_htcondor.py index 4f386680..cd9349f0 100644 --- a/dask_jobqueue/tests/test_htcondor.py +++ b/dask_jobqueue/tests/test_htcondor.py @@ -13,9 +13,9 @@ def test_header(): with HTCondorCluster(cores=1, memory="100MB", disk="100MB") as cluster: - assert cluster.job_header_dict["MY.DaskWorkerCores"] == 1 - assert cluster.job_header_dict["MY.DaskWorkerDisk"] == 100000000 - assert cluster.job_header_dict["MY.DaskWorkerMemory"] == 100000000 + assert cluster._dummy_job.job_header_dict["MY.DaskWorkerCores"] == 1 + assert cluster._dummy_job.job_header_dict["MY.DaskWorkerDisk"] == 100000000 + assert cluster._dummy_job.job_header_dict["MY.DaskWorkerMemory"] == 100000000 def test_job_script(): @@ -98,4 +98,4 @@ def test_config_name_htcondor_takes_custom_config(): with dask.config.set({"jobqueue.htcondor-config-name": conf}): with HTCondorCluster(config_name="htcondor-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py new file mode 100644 index 00000000..71f26f09 --- /dev/null +++ b/dask_jobqueue/tests/test_job.py @@ -0,0 +1,180 @@ +import asyncio +from time import time + +from dask_jobqueue import ( + PBSCluster, + SGECluster, + SLURMCluster, + LSFCluster, + HTCondorCluster, + MoabCluster, + OARCluster, +) +from dask_jobqueue.local import LocalJob, LocalCluster +from dask_jobqueue.pbs import PBSJob +from dask_jobqueue.sge import SGEJob +from dask_jobqueue.slurm import SLURMJob +from dask_jobqueue.lsf import LSFJob +from dask_jobqueue.moab import MoabJob +from dask_jobqueue.htcondor import HTCondorJob +from dask_jobqueue.oar import OARJob + +from dask_jobqueue.core import JobQueueCluster +from dask.distributed import Scheduler, Client + +import pytest + + +def test_basic(): + job = PBSJob(scheduler="127.0.0.1:12345", cores=1, memory="1 GB") + assert "127.0.0.1:12345" in job.job_script() + + +job_protected = [ + pytest.param(SGEJob, marks=[pytest.mark.env("sge")]), + pytest.param(PBSJob, marks=[pytest.mark.env("pbs")]), + pytest.param(SLURMJob, marks=[pytest.mark.env("slurm")]), + pytest.param(LSFJob, marks=[pytest.mark.env("lsf")]), + LocalJob, +] + + +all_jobs = [SGEJob, PBSJob, SLURMJob, LSFJob, HTCondorJob, MoabJob, OARJob] +all_clusters = [ + SGECluster, + PBSCluster, + SLURMCluster, + LSFCluster, + HTCondorCluster, + MoabCluster, + OARCluster, +] + + +@pytest.mark.parametrize("job_cls", job_protected) +@pytest.mark.asyncio +async def test_job(job_cls): + async with Scheduler(port=0) as s: + job = job_cls(scheduler=s.address, name="foo", cores=1, memory="1GB") + job = await job + async with Client(s.address, asynchronous=True) as client: + await client.wait_for_workers(1) + assert list(s.workers.values())[0].name == "foo" + + await job.close() + + start = time() + while len(s.workers): + await asyncio.sleep(0.1) + assert time() < start + 10 + + +@pytest.mark.parametrize("job_cls", job_protected) +@pytest.mark.asyncio +async def test_cluster(job_cls): + async with JobQueueCluster( + 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + assert len(cluster.workers) == 1 + cluster.scale(jobs=2) + await cluster + assert len(cluster.workers) == 2 + assert all(isinstance(w, job_cls) for w in cluster.workers.values()) + assert all(w.status == "running" for w in cluster.workers.values()) + await client.wait_for_workers(2) + + cluster.scale(1) + start = time() + await cluster + while len(cluster.scheduler.workers) > 1: + await asyncio.sleep(0.1) + assert time() < start + 10 + + +@pytest.mark.parametrize("job_cls", job_protected) +@pytest.mark.asyncio +async def test_adapt(job_cls): + async with JobQueueCluster( + 1, cores=1, memory="1GB", job_cls=job_cls, asynchronous=True, name="foo" + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(1) + cluster.adapt(minimum=0, maximum=4, interval="10ms") + + start = time() + while len(cluster.scheduler.workers) or cluster.workers: + await asyncio.sleep(0.050) + assert time() < start + 10 + assert not cluster.worker_spec + assert not cluster.workers + + future = client.submit(lambda: 0) + await client.wait_for_workers(1) + + del future + + start = time() + while len(cluster.scheduler.workers) or cluster.workers: + await asyncio.sleep(0.050) + assert time() < start + 10 + assert not cluster.worker_spec + assert not cluster.workers + + +@pytest.mark.parametrize("job_cls", job_protected) +@pytest.mark.asyncio +async def test_adapt_parameters(job_cls): + async with JobQueueCluster( + cores=2, memory="1GB", processes=2, job_cls=job_cls, asynchronous=True + ) as cluster: + adapt = cluster.adapt(minimum=2, maximum=4, interval="10ms") + await adapt.adapt() + await cluster + assert len(cluster.workers) == 1 # 2 workers, 4 jobs + + adapt = cluster.adapt(minimum_jobs=2, maximum_jobs=4, interval="10ms") + await adapt.adapt() + await cluster + assert len(cluster.workers) == 2 # 2 workers, 4 jobs + + +def test_header_lines_skip(): + job = PBSJob(cores=1, memory="1GB", job_name="foobar") + assert "foobar" in job.job_script() + + job = PBSJob(cores=1, memory="1GB", job_name="foobar", header_skip=["-N"]) + assert "foobar" not in job.job_script() + + +@pytest.mark.asyncio +async def test_nprocs_scale(): + async with LocalCluster( + cores=2, memory="4GB", processes=2, asynchronous=True + ) as cluster: + s = cluster.scheduler + async with Client(cluster, asynchronous=True) as client: + cluster.scale(cores=2) + await cluster + await client.wait_for_workers(2) + assert len(cluster.workers) == 1 # two workers, one job + assert len(s.workers) == 2 + assert cluster.plan == {ws.name for ws in s.workers.values()} + + cluster.scale(cores=1) + await cluster + await asyncio.sleep(0.2) + assert len(cluster.scheduler.workers) == 2 # they're still one group + + cluster.scale(jobs=2) + assert len(cluster.worker_spec) == 2 + cluster.scale(5) + assert len(cluster.worker_spec) == 3 + cluster.scale(1) + assert len(cluster.worker_spec) == 1 + + +@pytest.mark.parametrize("Cluster", all_clusters) +def test_docstring_cluster(Cluster): + assert "cores :" in Cluster.__doc__ + assert Cluster.__name__[: -len("Cluster")] in Cluster.__doc__ diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index b8a6dfbe..c4acd785 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -16,23 +16,23 @@ OARCluster, ) +from dask_jobqueue.sge import SGEJob + def test_errors(): - with pytest.raises(NotImplementedError) as info: + with pytest.raises(ValueError, match="Job type.*job_cls="): JobQueueCluster(cores=4) - assert "abstract class" in str(info.value) - def test_command_template(): with PBSCluster(cores=2, memory="4GB") as cluster: assert ( "%s -m distributed.cli.dask_worker" % (sys.executable) - in cluster._command_template + in cluster._dummy_job._command_template ) - assert " --nthreads 2" in cluster._command_template - assert " --memory-limit " in cluster._command_template - assert " --name " in cluster._command_template + assert " --nthreads 2" in cluster._dummy_job._command_template + assert " --memory-limit " in cluster._dummy_job._command_template + assert " --name " in cluster._dummy_job._command_template with PBSCluster( cores=2, @@ -41,9 +41,9 @@ def test_command_template(): local_directory="/scratch", extra=["--preload", "mymodule"], ) as cluster: - assert " --death-timeout 60" in cluster._command_template - assert " --local-directory /scratch" in cluster._command_template - assert " --preload mymodule" in cluster._command_template + assert " --death-timeout 60" in cluster._dummy_job._command_template + assert " --local-directory /scratch" in cluster._dummy_job._command_template + assert " --preload mymodule" in cluster._dummy_job._command_template @pytest.mark.parametrize( @@ -62,20 +62,6 @@ def test_shebang_settings(Cluster): assert job_script.startswith(default_shebang) -@pytest.mark.parametrize( - "Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster] -) -def test_repr(Cluster): - with Cluster( - walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker" - ) as cluster: - cluster_repr = repr(cluster) - assert cluster.__class__.__name__ in cluster_repr - assert "cores=0" in cluster_repr - assert "memory=0 B" in cluster_repr - assert "workers=0" in cluster_repr - - @pytest.mark.parametrize( "Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster] ) @@ -94,18 +80,16 @@ def test_forward_ip(): name="dask-worker", host=ip, ) as cluster: - assert cluster.local_cluster.scheduler.ip == ip + assert cluster.scheduler.ip == ip default_ip = socket.gethostbyname("") with PBSCluster( walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker" ) as cluster: - assert cluster.local_cluster.scheduler.ip == default_ip + assert cluster.scheduler.ip == default_ip -@pytest.mark.parametrize( - "Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster] -) +@pytest.mark.parametrize("Cluster", []) @pytest.mark.parametrize( "qsub_return_string", [ @@ -117,17 +101,34 @@ def test_forward_ip(): "{job_id}", ], ) -def test_job_id_from_qsub(Cluster, qsub_return_string): +def test_job_id_from_qsub_legacy(Cluster, qsub_return_string): original_job_id = "654321" qsub_return_string = qsub_return_string.format(job_id=original_job_id) with Cluster(cores=1, memory="1GB") as cluster: assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string) +@pytest.mark.parametrize("job_cls", [SGEJob]) @pytest.mark.parametrize( - "Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster] + "qsub_return_string", + [ + "{job_id}.admin01", + "Request {job_id}.asdf was sumbitted to queue: standard.", + "sbatch: Submitted batch job {job_id}", + "{job_id};cluster", + "Job <{job_id}> is submitted to default queue .", + "{job_id}", + ], ) -def test_job_id_error_handling(Cluster): +def test_job_id_from_qsub(job_cls, qsub_return_string): + original_job_id = "654321" + qsub_return_string = qsub_return_string.format(job_id=original_job_id) + job = job_cls(cores=1, memory="1GB") + assert original_job_id == job._job_id_from_submit_output(qsub_return_string) + + +@pytest.mark.parametrize("Cluster", []) +def test_job_id_error_handling_legacy(Cluster): # non-matching regexp with Cluster(cores=1, memory="1GB") as cluster: with pytest.raises(ValueError, match="Could not parse job id"): @@ -142,6 +143,22 @@ def test_job_id_error_handling(Cluster): cluster._job_id_from_submit_output(return_string) +@pytest.mark.parametrize("job_cls", [SGEJob]) +def test_job_id_error_handling(job_cls): + # non-matching regexp + job = job_cls(cores=1, memory="1GB") + with pytest.raises(ValueError, match="Could not parse job id"): + return_string = "there is no number here" + job._job_id_from_submit_output(return_string) + + # no job_id named group in the regexp + job = job_cls(cores=1, memory="1GB") + with pytest.raises(ValueError, match="You need to use a 'job_id' named group"): + return_string = "Job <12345> submitted to ." + job.job_id_regexp = r"(\d+)" + job._job_id_from_submit_output(return_string) + + def test_log_directory(tmpdir): shutil.rmtree(tmpdir.strpath, ignore_errors=True) with PBSCluster(cores=1, memory="1GB"): @@ -151,6 +168,7 @@ def test_log_directory(tmpdir): assert os.path.exists(tmpdir.strpath) +@pytest.mark.skip def test_jobqueue_cluster_call(tmpdir): cluster = PBSCluster(cores=1, memory="1GB") @@ -180,12 +198,11 @@ def test_jobqueue_cluster_call(tmpdir): [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster], ) def test_cluster_has_cores_and_memory(Cluster): - cls_name = Cluster.__name__ + r"\(" - with pytest.raises(ValueError, match=cls_name + r"cores=\d, memory='\d+GB'"): + with pytest.raises(ValueError, match=r"cores=\d, memory='\d+GB'"): Cluster() - with pytest.raises(ValueError, match=cls_name + r"cores=\d, memory='1GB'"): + with pytest.raises(ValueError, match=r"cores=\d, memory='1GB'"): Cluster(memory="1GB") - with pytest.raises(ValueError, match=cls_name + r"cores=4, memory='\d+GB'"): + with pytest.raises(ValueError, match=r"cores=4, memory='\d+GB'"): Cluster(cores=4) diff --git a/dask_jobqueue/tests/test_lsf.py b/dask_jobqueue/tests/test_lsf.py index bd3ca5e7..7915a1bb 100644 --- a/dask_jobqueue/tests/test_lsf.py +++ b/dask_jobqueue/tests/test_lsf.py @@ -25,7 +25,6 @@ def test_header(): assert "#BSUB -W 00:02" in cluster.job_header assert "#BSUB -q" not in cluster.job_header assert "#BSUB -P" not in cluster.job_header - assert "--name dask-worker--${JOB_ID}--" in cluster.job_script() with LSFCluster( queue="general", @@ -249,7 +248,7 @@ def test_config_name_lsf_takes_custom_config(): with dask.config.set({"jobqueue.lsf-config-name": conf}): with LSFCluster(config_name="lsf-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" def test_informative_errors(): diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index b7eaaa7e..5035852b 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -110,4 +110,4 @@ def test_config_name_oar_takes_custom_config(): with dask.config.set({"jobqueue.oar-config-name": conf}): with OARCluster(config_name="oar-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index f7abb53f..2e92162c 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -22,7 +22,6 @@ def test_header(Cluster): assert "#PBS -l walltime=00:02:00" in cluster.job_header assert "#PBS -q" not in cluster.job_header assert "#PBS -A" not in cluster.job_header - assert "--name dask-worker--${JOB_ID}--" in cluster.job_script() with Cluster( queue="regular", @@ -113,15 +112,11 @@ def test_basic(loop): with Client(cluster) as client: cluster.scale(2) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT + client.wait_for_workers(2) future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs + # assert cluster.running_jobs workers = list(client.scheduler_info()["workers"].values()) w = workers[0] @@ -131,11 +126,11 @@ def test_basic(loop): cluster.scale(0) start = time() - while cluster.running_jobs: + while client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT - assert not cluster.running_jobs + assert not cluster.workers and not cluster.worker_spec @pytest.mark.env("pbs") @@ -152,15 +147,11 @@ def test_scale_cores_memory(loop): with Client(cluster) as client: cluster.scale(cores=2) - - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT + client.wait_for_workers(1) future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs + assert cluster.workers workers = list(client.scheduler_info()["workers"].values()) w = workers[0] @@ -170,11 +161,11 @@ def test_scale_cores_memory(loop): cluster.scale(memory="0GB") start = time() - while cluster.running_jobs: + while client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT - assert not cluster.running_jobs + assert not cluster.workers @pytest.mark.env("pbs") @@ -195,11 +186,11 @@ def test_basic_scale_edge_cases(loop): # Wait to see what happens sleep(0.2) start = time() - while cluster.pending_jobs or cluster.running_jobs: + while cluster.workers: sleep(0.1) assert time() < start + QUEUE_WAIT - assert not (cluster.pending_jobs or cluster.running_jobs) + assert not cluster.workers @pytest.mark.env("pbs") @@ -219,7 +210,7 @@ def test_adaptive(loop): assert future.result(QUEUE_WAIT) == 11 start = time() - processes = cluster.worker_processes + processes = cluster._dummy_job.worker_processes while len(client.scheduler_info()["workers"]) != processes: sleep(0.1) assert time() < start + QUEUE_WAIT @@ -227,12 +218,10 @@ def test_adaptive(loop): del future start = time() - while cluster.pending_jobs or cluster.running_jobs: + while client.scheduler_info()["workers"] or cluster.workers: sleep(0.100) assert time() < start + QUEUE_WAIT - assert cluster.finished_jobs - @pytest.mark.env("pbs") def test_adaptive_grouped(loop): @@ -247,21 +236,13 @@ def test_adaptive_grouped(loop): ) as cluster: cluster.adapt(minimum=1) # at least 1 worker with Client(cluster) as client: - start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT + client.wait_for_workers(1) future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 start = time() - while not cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT - - start = time() - processes = cluster.worker_processes + processes = cluster._dummy_job.worker_processes while len(client.scheduler_info()["workers"]) != processes: sleep(0.1) assert time() < start + QUEUE_WAIT @@ -284,7 +265,7 @@ def test_adaptive_cores_mem(loop): assert future.result(QUEUE_WAIT) == 11 start = time() - processes = cluster.worker_processes + processes = cluster._dummy_job.worker_processes while len(client.scheduler_info()["workers"]) != processes: sleep(0.1) assert time() < start + QUEUE_WAIT @@ -292,12 +273,10 @@ def test_adaptive_cores_mem(loop): del future start = time() - while cluster.pending_jobs or cluster.running_jobs: + while cluster.workers: sleep(0.100) assert time() < start + QUEUE_WAIT - assert cluster.finished_jobs - @pytest.mark.env("pbs") def test_scale_grouped(loop): @@ -315,9 +294,6 @@ def test_scale_grouped(loop): cluster.scale(4) # Start 2 jobs start = time() - while len(cluster.running_jobs) != 2: - sleep(0.100) - assert time() < start + QUEUE_WAIT while len(list(client.scheduler_info()["workers"].values())) != 4: sleep(0.100) @@ -325,7 +301,7 @@ def test_scale_grouped(loop): future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs + # assert cluster.running_jobs workers = list(client.scheduler_info()["workers"].values()) w = workers[0] @@ -336,23 +312,19 @@ def test_scale_grouped(loop): cluster.scale(1) # Should leave 2 workers, 1 job start = time() - while len(cluster.running_jobs) != 1: + while len(client.scheduler_info()["workers"]) != 2: sleep(0.100) assert time() < start + QUEUE_WAIT - assert len(cluster.running_jobs) == 1 - workers = list(client.scheduler_info()["workers"].values()) - assert len(workers) == 2 - cluster.scale(0) start = time() - while cluster.running_jobs: + + assert not cluster.worker_spec + while len(client.scheduler_info()["workers"]) != 0: sleep(0.100) assert time() < start + QUEUE_WAIT - assert not cluster.running_jobs - def test_config(loop): with dask.config.set( @@ -388,7 +360,7 @@ def test_config_name_pbs_takes_custom_config(): with dask.config.set({"jobqueue.pbs-config-name": conf}): with PBSCluster(config_name="pbs-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" def test_informative_errors(): @@ -401,6 +373,7 @@ def test_informative_errors(): assert "cores" in str(info.value) -def test_adapt(loop): - with PBSCluster(loop, cores=1, memory="1 GB") as cluster: +@pytest.mark.asyncio +async def test_adapt(loop): + async with PBSCluster(cores=1, memory="1 GB", asynchronous=True) as cluster: cluster.adapt() diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 0d8dac75..6020dabb 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -19,13 +19,13 @@ def test_basic(loop): cluster.scale(2) start = time() - while not (cluster.pending_jobs or cluster.running_jobs): + while not client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs + assert len(client.scheduler_info()["workers"]) > 0 workers = list(client.scheduler_info()["workers"].values()) w = workers[0] @@ -35,7 +35,7 @@ def test_basic(loop): cluster.scale(0) start = time() - while cluster.running_jobs: + while client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT @@ -65,7 +65,7 @@ def test_config_name_sge_takes_custom_config(): with dask.config.set({"jobqueue.sge-config-name": conf}): with SGECluster(config_name="sge-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" def test_job_script(tmpdir): @@ -107,19 +107,20 @@ def test_complex_cancel_command(loop): with SGECluster( walltime="00:02:00", cores=1, processes=1, memory="2GB", loop=loop ) as cluster: - username = "root" - cluster.cancel_command = "qdel -u {}".format(username) + with Client(cluster) as client: + username = "root" + cluster.cancel_command = "qdel -u {}".format(username) - cluster.scale(2) + cluster.scale(2) - start = time() - while not cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT + start = time() + while not client.scheduler_info()["workers"]: + sleep(0.100) + assert time() < start + QUEUE_WAIT - cluster.stop_all_jobs() + cluster.scale(0) - start = time() - while cluster.running_jobs: - sleep(0.100) - assert time() < start + QUEUE_WAIT + start = time() + while client.scheduler_info()["workers"]: + sleep(0.100) + assert time() < start + QUEUE_WAIT diff --git a/dask_jobqueue/tests/test_slurm.py b/dask_jobqueue/tests/test_slurm.py index 16065f18..6b26ffc0 100644 --- a/dask_jobqueue/tests/test_slurm.py +++ b/dask_jobqueue/tests/test_slurm.py @@ -23,7 +23,7 @@ def test_header(): assert "#SBATCH --mem=27G" in cluster.job_header assert "#SBATCH -t 00:02:00" in cluster.job_header assert "#SBATCH -p" not in cluster.job_header - assert "#SBATCH -A" not in cluster.job_header + # assert "#SBATCH -A" not in cluster.job_header with SLURMCluster( queue="regular", @@ -49,7 +49,7 @@ def test_header(): assert "#SBATCH -n 1" in cluster.job_header assert "#SBATCH -t " in cluster.job_header assert "#SBATCH -p" not in cluster.job_header - assert "#SBATCH -A" not in cluster.job_header + # assert "#SBATCH -A" not in cluster.job_header def test_job_script(): @@ -66,7 +66,7 @@ def test_job_script(): assert "#SBATCH --mem=27G" in job_script assert "#SBATCH -t 00:02:00" in job_script assert "#SBATCH -p" not in job_script - assert "#SBATCH -A" not in job_script + # assert "#SBATCH -A" not in job_script assert "export " not in job_script @@ -95,7 +95,7 @@ def test_job_script(): assert "#SBATCH --mem=27G" in job_script assert "#SBATCH -t 00:02:00" in job_script assert "#SBATCH -p" not in job_script - assert "#SBATCH -A" not in job_script + # assert "#SBATCH -A" not in job_script assert 'export LANG="en_US.utf8"' in job_script assert 'export LANGUAGE="en_US.utf8"' in job_script @@ -115,7 +115,7 @@ def test_basic(loop): cores=2, processes=1, memory="2GB", - job_extra=["-D /"], + # job_extra=["-D /"], loop=loop, ) as cluster: with Client(cluster) as client: @@ -123,13 +123,10 @@ def test_basic(loop): cluster.scale(2) start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT + client.wait_for_workers(2) future = client.submit(lambda x: x + 1, 10) assert future.result(QUEUE_WAIT) == 11 - assert cluster.running_jobs workers = list(client.scheduler_info()["workers"].values()) w = workers[0] @@ -139,7 +136,7 @@ def test_basic(loop): cluster.scale(0) start = time() - while cluster.running_jobs: + while client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT @@ -151,7 +148,7 @@ def test_adaptive(loop): cores=2, processes=1, memory="2GB", - job_extra=["-D /"], + # job_extra=["-D /"], loop=loop, ) as cluster: cluster.adapt() @@ -159,27 +156,17 @@ def test_adaptive(loop): future = client.submit(lambda x: x + 1, 10) start = time() - while not (cluster.pending_jobs or cluster.running_jobs): - sleep(0.100) - assert time() < start + QUEUE_WAIT + client.wait_for_workers(1) assert future.result(QUEUE_WAIT) == 11 - start = time() - processes = cluster.worker_processes - while len(client.scheduler_info()["workers"]) != processes: - sleep(0.1) - assert time() < start + QUEUE_WAIT - del future start = time() - while cluster.running_jobs: + while client.scheduler_info()["workers"]: sleep(0.100) assert time() < start + QUEUE_WAIT - assert cluster.finished_jobs - def test_config_name_slurm_takes_custom_config(): conf = { @@ -205,4 +192,4 @@ def test_config_name_slurm_takes_custom_config(): with dask.config.set({"jobqueue.slurm-config-name": conf}): with SLURMCluster(config_name="slurm-config-name") as cluster: - assert cluster.name == "myname" + assert cluster.job_name == "myname" diff --git a/docs/environment.yml b/docs/environment.yml index bf834ebf..7d9241cd 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -4,7 +4,6 @@ channels: dependencies: - python=3.6 - distributed - - docrep - numpydoc - ipython - sphinx diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 97ddc4a2..069bb6e2 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,25 @@ Changelog ========= +0.7.0 / 2019-XX-XX +------------------ + +- Base Dask-Jobqueue on top of the core ``dask.distributed.SpecCluster`` class + (:pr:`307`) + + This is nearly complete reimplementation of the dask-jobqueue logic on top + of more centralized logic. This improves standardization and adds new + features, but does include the following **breaking changes**: + + - The ``cluster.stop_all_jobs()`` method has been removed. + Please use ``cluster.scale(0)`` instead. + - The attributes ``running_jobs``, ``pending_jobs``, and + ``cancelled_jobs`` have been removed. These have been moved upstream to + the ``dask.distributed.SpecCluster`` class instead as ``workers`` and + ``worker_spec``, as well as ``.plan``, ``.requested``, and ``.observed``. + - The ``name`` attribute has been moved to ``job_name``. + + 0.6.3 / 2019-08-18 ------------------ diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index e52b75d8..eb867c64 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -30,12 +30,20 @@ define a single job: Note that the ``cores`` and ``memory`` keywords above correspond not to your full desired deployment, but rather to the size of a *single job* which should -be no larger than the size of a single machine in your cluster. Separately you -will specify how many jobs to deploy using the scale method. +be no larger than the size of a single machine in your cluster. + +Separately you will specify how many jobs to deploy using the scale method. +You can either specify the number of workers, or the total number of cores or +memory that you want. .. code-block:: python - cluster.scale(12) # launch 12 workers (2 jobs of 6 workers each) of the specification provided above + cluster.scale(jobs=2) # launch 2 workers, each of which starts 6 worker processes + cluster.scale(cores=48) # Or specify cores or memory directly + cluster.scale(memory="200 GB") # Or specify cores or memory directly + +These all accomplish the same thing. You can chose whichever makes the most +sense to you. Configuration Files diff --git a/docs/source/howitworks.rst b/docs/source/howitworks.rst index 00c26b3a..3e0a2ca2 100644 --- a/docs/source/howitworks.rst +++ b/docs/source/howitworks.rst @@ -23,11 +23,20 @@ object is instantiated: walltime='02:00:00', ) -You then ask for more workers using the ``scale`` command: +These parameters specify the characteristics of a *single job* or a *single +compute node*, rather than the characteristics of your computation as a whole. +It hasn't actually launched any jobs yet. +For the full computation, you will then ask for a number of jobs using the +``scale`` command: .. code-block:: python - cluster.scale(36) + cluster.scale(jobs=2) # launch 2 jobs, each of which starts 6 worker processes + cluster.scale(cores=48) # Or specify cores or memory directly + cluster.scale(memory="200 GB") # Or specify cores or memory directly + +You can either specify the number of jobs, or the total number of cores or +memory that you want. The cluster generates a traditional job script and submits that an appropriate number of times to the job queue. You can see the job script that it will diff --git a/docs/source/index.rst b/docs/source/index.rst index 3d0eec19..d4625452 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -19,7 +19,7 @@ Example from dask_jobqueue import PBSCluster cluster = PBSCluster() - cluster.scale(10) # Ask for ten workers + cluster.scale(jobs=10) # Deploy ten single-node jobs from dask.distributed import Client client = Client(cluster) # Connect this local process to remote workers @@ -45,7 +45,8 @@ save resources when not actively computing. .. code-block:: python - cluster.adapt(minimum=6, maximum=90) # auto-scale between 6 and 90 workers + cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs + cluster.adapt(maximum_memory="10 TB") # or use core/memory limits More details ------------ diff --git a/requirements.txt b/requirements.txt index ba31c0ce..87c3fc36 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -dask>=2 -distributed>=2.1 -docrep +dask>=2.3 +distributed>=2.3