diff --git a/.github/workflows/workflows.yml b/.github/workflows/workflows.yml index 4892d64a..a3175fee 100644 --- a/.github/workflows/workflows.yml +++ b/.github/workflows/workflows.yml @@ -7,7 +7,6 @@ jobs: static_analysis: runs-on: ubuntu-latest - timeout-minutes: 10 defaults: run: @@ -31,7 +30,6 @@ jobs: tests: runs-on: ${{ matrix.os }} - timeout-minutes: 10 defaults: run: @@ -81,7 +79,6 @@ jobs: tests-wsl: runs-on: windows-latest - timeout-minutes: 10 defaults: run: diff --git a/doc/user_guides/dask.rst b/doc/user_guides/dask.rst index 7c3c5dda..b4587a78 100644 --- a/doc/user_guides/dask.rst +++ b/doc/user_guides/dask.rst @@ -126,7 +126,7 @@ file system. An example of that is from distributed.diagnostics.plugin import WorkerPlugin - class DaskWorkerPlugin(WorkerPlugin): + class CopyWorkerPlugin(WorkerPlugin): def __init__(self, tmp_dir, target_dir): self.tmp_dir = tmp_dir self.target_dir = target_dir @@ -137,11 +137,71 @@ file system. An example of that is class Worker(Component): def step(self, timestamp=0.0): cwd = self.services.get_working_dir() + tmp_xfs_dir = '/tmp' self.services.create_task_pool('pool') - self.services.add_task('pool', 'task_1', 1, '/tmp/', 'executable') + self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable') - worker_plugin = DaskWorkerPlugin('/tmp', cwd) + worker_plugin = CopyWorkerPlugin(tmp_xfs_dir, cwd) + + ret_val = self.services.submit_tasks('pool', + use_dask=True, use_shifter=True, + dask_worker_plugin=worker_plugin) + + exit_status = self.services.get_finished_tasks('pool') + + +where the batch script has the temporary XFS filesystem mounted as + +.. code-block:: bash + + #SBATCH --volume="/global/cscratch1/sd/$USER/tmpfiles:/tmp:perNodeCache=size=1G" + + +Continuous Archiving +^^^^^^^^^^^^^^^^^^^^ + +Another example is a WorkerPlugin that will continuously create a tar +archive of the output data at a regular interval while tasks are +executing. This is useful should the workflow fail or is canceled +before everything is finished. It creates a separate achieve for each +node/worker since the temporary XFS filesystem is unique per +node. This example creates an archive of all the data in the working +directory every 60 seconds and again when everything is finished. + +.. code-block:: python + + def file_daemon(worker_id, evt, source_dir, target_dir): + cmd = f"tar -caf {target_dir}/{worker_id}_archive.tar.gz -C {source_dir} ." + + while not evt.wait(60): # interval which to archive data + os.system(cmd) + + os.system(cmd) + + class ContinuousArchivingWorkerPlugin(WorkerPlugin): + def __init__(self, tmp_dir, target_dir): + self.tmp_dir = tmp_dir + self.target_dir = target_dir + + def setup(self, worker): + self.evt = Event() + self.thread = Thread(target=file_daemon, args=(worker.id, self.evt, self.tmp_dir, self.target_dir)) + self.thread.start() + + def teardown(self, worker): + self.evt.set() # tells the thread to exit + self.thread.join() + + class Worker(Component): + def step(self, timestamp=0.0): + cwd = self.services.get_working_dir() + tmp_xfs_dir = '/tmp' + + self.services.create_task_pool('pool') + self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable') + + worker_plugin = ContinuousArchivingWorkerPlugin(tmp_xfs_dir, cwd) ret_val = self.services.submit_tasks('pool', use_dask=True, use_shifter=True,