diff --git a/docs/source/advanced-tips-and-tricks.rst b/docs/source/advanced-tips-and-tricks.rst index 95b4d884..bd798106 100644 --- a/docs/source/advanced-tips-and-tricks.rst +++ b/docs/source/advanced-tips-and-tricks.rst @@ -69,6 +69,68 @@ accepted option on some SLURM clusters. The error was something like this: sbatch: error: Batch job submission failed: Requested node configuration is not available +How to handle job queueing system walltime killing workers +---------------------------------------------------------- +In dask-jobqueue, every worker process runs inside a job, and all jobs have a time limit in job queueing systems. +Reaching walltime can be troublesome in several cases: + +- when you don't have a lot of room on you HPC platform and have only a few workers at a time + (less than what you were hoping for when using scale or adapt). These workers will be + killed (and others started) before your workload ends. +- when you really don't know how long your workload will take: all your workers could be + killed before reaching the end. In this case, you'll want to use adaptive clusters so + that Dask ensures some workers are always up. + +If you don't set the proper parameters, you'll run into KilleWorker exception in those two cases. + +The solution to this problem is to tell Dask up front that the workers have a finite lifetime: + +- Use `--lifetime` worker option. This will enable infinite workloads using adaptive. + Workers will be properly shut down before the scheduling system kills them, and all their states moved. +- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will prevent workers from + terminating at the same time, thus ease rebalancing tasks and scheduling burden. + +Here is an example of how to use these parameters: + +.. code-block:: python + + cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"]) + cluster.adapt(minimum=0, maximum=200) + + +Here is an example of a workflow taking advantage of this, if you want to give it a try or adapt it to your use case: + +.. code-block:: python + + import time + import numpy as np + from dask_jobqueue import PBSCluster as Cluster + from dask import delayed + from dask.distributed import Client, as_completed + + # config in $HOME/.config/dask/jobqueue.yaml + cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb') + cluster.adapt(minimum=0, maximum=4) + + client = Client(cluster) + + # each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks. + filenames = [f'img{num}.jpg' for num in range(480)] + + def features(num_fn): + num, image_fn = num_fn + time.sleep(1) # takes about 1s to compute features on an image + features = np.random.random(246) + return num, features + + num_files = len(filenames) + num_features = len(features((0, filenames[0]))[1]) # FIX + + X = np.zeros((num_files, num_features), dtype=np.float32) + + for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX + i, v = future.result() + X[i, :] = v