From 59033df8815c11d56840a865f741c4a8469d2fa4 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 29 Jun 2017 15:34:15 -0400 Subject: [PATCH 1/2] Add fire_and_forget function Run tasks at least once, even if we release the futures Under normal operation Dask will not run any tasks for which there is not an active future (this avoids unnecessary work in many situations). However sometimes you want to just fire off a task, not track its future, and expect it to finish eventually. You can use this function on a future or collection of futures to ask Dask to complete the task even if no active client is tracking it. The results will not be kept in memory after the task completes (unless there is an active future) so this is only useful for tasks that depend on side effects. Parameters ---------- obj: Future, list, dict, dask collection The futures that you want to run at least once Examples -------- >>> fire_and_forget(client.submit(func, *args)) Fixes https://github.com/dask/distributed/issues/1205 --- distributed/__init__.py | 2 +- distributed/client.py | 30 +++++++++++++++++ distributed/scheduler.py | 6 ++++ distributed/tests/test_client.py | 44 ++++++++++++++++++++++--- distributed/tests/test_worker_client.py | 3 +- docs/source/api.rst | 2 ++ 6 files changed, 81 insertions(+), 6 deletions(-) diff --git a/distributed/__init__.py b/distributed/__init__.py index 4f0ad28547f..e955ab8776e 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -5,7 +5,7 @@ from .deploy import LocalCluster from .diagnostics import progress from .client import (Client, Executor, CompatibleExecutor, - wait, as_completed, default_client) + wait, as_completed, default_client, fire_and_forget) from .nanny import Nanny from .queues import Queue from .scheduler import Scheduler diff --git a/distributed/client.py b/distributed/client.py index 3851b03b7e6..1c51b5ff023 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2967,6 +2967,36 @@ def futures_of(o, client=None): return list(futures) +def fire_and_forget(obj): + """ Run tasks at least once, even if we release the futures + + Under normal operation Dask will not run any tasks for which there is not + an active future (this avoids unnecessary work in many situations). + However sometimes you want to just fire off a task, not track its future, + and expect it to finish eventually. You can use this function on a future + or collection of futures to ask Dask to complete the task even if no active + client is tracking it. + + The results will not be kept in memory after the task completes (unless + there is an active future) so this is only useful for tasks that depend on + side effects. + + Parameters + ---------- + obj: Future, list, dict, dask collection + The futures that you want to run at least once + + Examples + -------- + >>> fire_and_forget(client.submit(func, *args)) # doctest: +SKIP + """ + futures = futures_of(obj) + for future in futures: + future.client._send_to_scheduler({'op': 'client-desires-keys', + 'keys': [tokey(future.key)], + 'client': 'fire-and-forget'}) + + @contextmanager def temp_default_client(c): """ Set the default client for the duration of the context diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bb3a67369b1..79b8d54a666 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2405,6 +2405,9 @@ def transition_processing_memory(self, key, nbytes=None, type=None, self.task_state[key] = 'memory' + if key in self.wants_what['fire-and-forget']: + self.client_releases_keys(client='fire-and-forget', keys=[key]) + if self.validate: assert key not in self.rprocessing assert key not in self.waiting @@ -2650,6 +2653,9 @@ def transition_processing_erred(self, key, cause=None, exception=None, 'exception': self.exceptions[failing_key], 'traceback': self.tracebacks.get(failing_key)}) + if key in self.wants_what['fire-and-forget']: + self.client_releases_keys(client='fire-and-forget', keys=[key]) + if self.validate: assert key not in self.rprocessing diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 02865baddf4..f7ab83104e9 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -27,7 +27,7 @@ import dask from dask import delayed from dask.context import _globals -from distributed import Worker, Nanny, recreate_exceptions +from distributed import Worker, Nanny, recreate_exceptions, fire_and_forget from distributed.comm import CommClosedError from distributed.utils_comm import WrappedKey from distributed.client import (Client, Future, _wait, @@ -1767,7 +1767,9 @@ def test_multi_client(s, a, b): yield wait([x, y]) - assert s.wants_what == {c.id: {x.key, y.key}, f.id: {y.key}} + assert s.wants_what == {c.id: {x.key, y.key}, + f.id: {y.key}, + 'fire-and-forget': set()} assert s.who_wants == {x.key: {c.id}, y.key: {c.id, f.id}} yield c.shutdown() @@ -1835,7 +1837,9 @@ def test_multi_garbage_collection(s, a, b): yield gen.sleep(0.01) assert time() < start + 5 - assert s.wants_what == {c.id: {y.key}, f.id: {y.key}} + assert s.wants_what == {c.id: {y.key}, + f.id: {y.key}, + 'fire-and-forget': set()} assert s.who_wants == {y.key: {c.id, f.id}} y.__del__() @@ -1846,7 +1850,9 @@ def test_multi_garbage_collection(s, a, b): yield gen.sleep(0.1) assert y.key in a.data or y.key in b.data - assert s.wants_what == {c.id: {y.key}, f.id: set()} + assert s.wants_what == {c.id: {y.key}, + f.id: set(), + 'fire-and-forget': set()} assert s.who_wants == {y.key: {c.id}} y2.__del__() @@ -4168,6 +4174,36 @@ def __call__(self, *args): assert a.data and b.data +@gen_cluster(client=True) +def test_fire_and_forget(c, s, a, b): + future = c.submit(slowinc, 1, delay=0.1) + import distributed + + def f(x): + distributed.foo = 123 + + try: + fire_and_forget(c.submit(f, future)) + + start = time() + while not hasattr(distributed, 'foo'): + yield gen.sleep(0.01) + assert time() < start + 2 + assert distributed.foo == 123 + finally: + del distributed.foo + + assert set(s.who_wants) == {future.key} + assert set(s.task_state) == {future.key} + + +@gen_cluster(client=True) +def test_fire_and_forget_err(c, s, a, b): + fire_and_forget(c.submit(div, 1, 0)) + yield gen.sleep(0.1) + assert not s.task_state + + def test_quiet_client_shutdown(loop): import logging with captured_logger(logging.getLogger('distributed')) as logger: diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index a5ddb19ba24..cdfbef8b094 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -29,7 +29,8 @@ def func(x): assert yy == 20 + 1 + (20 + 1) * 2 assert len(s.transition_log) > 10 - assert len(s.wants_what) == 1 + assert len([id for id in s.wants_what + if id.lower().startswith('client')]) == 1 @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 2) diff --git a/docs/source/api.rst b/docs/source/api.rst index 8f6663eb086..bf74fef7ea5 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -78,6 +78,8 @@ API as_completed distributed.diagnostics.progress wait + fire_and_forget + Asynchronous methods -------------------- From 7616bc8800691091d75d24ff1110cc288a3c7bf4 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 29 Jun 2017 18:36:03 -0400 Subject: [PATCH 2/2] Make fire and forget tests more reliable on travis-ci --- distributed/tests/test_client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f7ab83104e9..237099f1f15 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4193,6 +4193,11 @@ def f(x): finally: del distributed.foo + start = time() + while len(s.task_state) > 1: + yield gen.sleep(0.01) + assert time() < start + 2 + assert set(s.who_wants) == {future.key} assert set(s.task_state) == {future.key} @@ -4201,7 +4206,12 @@ def f(x): def test_fire_and_forget_err(c, s, a, b): fire_and_forget(c.submit(div, 1, 0)) yield gen.sleep(0.1) - assert not s.task_state + + # erred task should clear out quickly + start = time() + while s.task_state: + yield gen.sleep(0.01) + assert time() < start + 1 def test_quiet_client_shutdown(loop):