diff --git a/distributed/__init__.py b/distributed/__init__.py index 4f0ad28547f..e955ab8776e 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -5,7 +5,7 @@ from .deploy import LocalCluster from .diagnostics import progress from .client import (Client, Executor, CompatibleExecutor, - wait, as_completed, default_client) + wait, as_completed, default_client, fire_and_forget) from .nanny import Nanny from .queues import Queue from .scheduler import Scheduler diff --git a/distributed/client.py b/distributed/client.py index 3851b03b7e6..1c51b5ff023 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2967,6 +2967,36 @@ def futures_of(o, client=None): return list(futures) +def fire_and_forget(obj): + """ Run tasks at least once, even if we release the futures + + Under normal operation Dask will not run any tasks for which there is not + an active future (this avoids unnecessary work in many situations). + However sometimes you want to just fire off a task, not track its future, + and expect it to finish eventually. You can use this function on a future + or collection of futures to ask Dask to complete the task even if no active + client is tracking it. + + The results will not be kept in memory after the task completes (unless + there is an active future) so this is only useful for tasks that depend on + side effects. + + Parameters + ---------- + obj: Future, list, dict, dask collection + The futures that you want to run at least once + + Examples + -------- + >>> fire_and_forget(client.submit(func, *args)) # doctest: +SKIP + """ + futures = futures_of(obj) + for future in futures: + future.client._send_to_scheduler({'op': 'client-desires-keys', + 'keys': [tokey(future.key)], + 'client': 'fire-and-forget'}) + + @contextmanager def temp_default_client(c): """ Set the default client for the duration of the context diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bb3a67369b1..79b8d54a666 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2405,6 +2405,9 @@ def transition_processing_memory(self, key, nbytes=None, type=None, self.task_state[key] = 'memory' + if key in self.wants_what['fire-and-forget']: + self.client_releases_keys(client='fire-and-forget', keys=[key]) + if self.validate: assert key not in self.rprocessing assert key not in self.waiting @@ -2650,6 +2653,9 @@ def transition_processing_erred(self, key, cause=None, exception=None, 'exception': self.exceptions[failing_key], 'traceback': self.tracebacks.get(failing_key)}) + if key in self.wants_what['fire-and-forget']: + self.client_releases_keys(client='fire-and-forget', keys=[key]) + if self.validate: assert key not in self.rprocessing diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 02865baddf4..237099f1f15 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -27,7 +27,7 @@ import dask from dask import delayed from dask.context import _globals -from distributed import Worker, Nanny, recreate_exceptions +from distributed import Worker, Nanny, recreate_exceptions, fire_and_forget from distributed.comm import CommClosedError from distributed.utils_comm import WrappedKey from distributed.client import (Client, Future, _wait, @@ -1767,7 +1767,9 @@ def test_multi_client(s, a, b): yield wait([x, y]) - assert s.wants_what == {c.id: {x.key, y.key}, f.id: {y.key}} + assert s.wants_what == {c.id: {x.key, y.key}, + f.id: {y.key}, + 'fire-and-forget': set()} assert s.who_wants == {x.key: {c.id}, y.key: {c.id, f.id}} yield c.shutdown() @@ -1835,7 +1837,9 @@ def test_multi_garbage_collection(s, a, b): yield gen.sleep(0.01) assert time() < start + 5 - assert s.wants_what == {c.id: {y.key}, f.id: {y.key}} + assert s.wants_what == {c.id: {y.key}, + f.id: {y.key}, + 'fire-and-forget': set()} assert s.who_wants == {y.key: {c.id, f.id}} y.__del__() @@ -1846,7 +1850,9 @@ def test_multi_garbage_collection(s, a, b): yield gen.sleep(0.1) assert y.key in a.data or y.key in b.data - assert s.wants_what == {c.id: {y.key}, f.id: set()} + assert s.wants_what == {c.id: {y.key}, + f.id: set(), + 'fire-and-forget': set()} assert s.who_wants == {y.key: {c.id}} y2.__del__() @@ -4168,6 +4174,46 @@ def __call__(self, *args): assert a.data and b.data +@gen_cluster(client=True) +def test_fire_and_forget(c, s, a, b): + future = c.submit(slowinc, 1, delay=0.1) + import distributed + + def f(x): + distributed.foo = 123 + + try: + fire_and_forget(c.submit(f, future)) + + start = time() + while not hasattr(distributed, 'foo'): + yield gen.sleep(0.01) + assert time() < start + 2 + assert distributed.foo == 123 + finally: + del distributed.foo + + start = time() + while len(s.task_state) > 1: + yield gen.sleep(0.01) + assert time() < start + 2 + + assert set(s.who_wants) == {future.key} + assert set(s.task_state) == {future.key} + + +@gen_cluster(client=True) +def test_fire_and_forget_err(c, s, a, b): + fire_and_forget(c.submit(div, 1, 0)) + yield gen.sleep(0.1) + + # erred task should clear out quickly + start = time() + while s.task_state: + yield gen.sleep(0.01) + assert time() < start + 1 + + def test_quiet_client_shutdown(loop): import logging with captured_logger(logging.getLogger('distributed')) as logger: diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index a5ddb19ba24..cdfbef8b094 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -29,7 +29,8 @@ def func(x): assert yy == 20 + 1 + (20 + 1) * 2 assert len(s.transition_log) > 10 - assert len(s.wants_what) == 1 + assert len([id for id in s.wants_what + if id.lower().startswith('client')]) == 1 @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 2) diff --git a/docs/source/api.rst b/docs/source/api.rst index 8f6663eb086..bf74fef7ea5 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -78,6 +78,8 @@ API as_completed distributed.diagnostics.progress wait + fire_and_forget + Asynchronous methods --------------------