From f6f25d3180533d4ff42305ef06a3baaa1da9a1e2 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sun, 7 Oct 2018 14:50:14 -0400 Subject: [PATCH 1/3] Remove Joblib Dask Backend from codebase This code has moved to joblib itself This commits removes all relevant code and tests, and raises an informative error message instead. --- distributed/joblib.py | 301 ++----------------------------- distributed/tests/test_joblib.py | 295 ------------------------------ 2 files changed, 11 insertions(+), 585 deletions(-) delete mode 100644 distributed/tests/test_joblib.py diff --git a/distributed/joblib.py b/distributed/joblib.py index 517b82ebc28..fd81a8b078e 100644 --- a/distributed/joblib.py +++ b/distributed/joblib.py @@ -1,298 +1,19 @@ -from __future__ import print_function, division, absolute_import +msg = """ It is no longer necessary to `import dask_ml.joblib` or +`import distributed.joblib`. -import contextlib +This functionality has moved into the core Joblib codebase. -from distutils.version import LooseVersion -from uuid import uuid4 -import weakref +To use Joblib's Dask backend with Scikit-Learn >= 0.20.0 -from tornado import gen + from dask.distributed import Client + client = Client() -from .client import Client, _wait -from .utils import ignoring, funcname, itemgetter -from . import get_client, secede, rejoin -from .worker import thread_state -from .sizeof import sizeof + from sklearn.externals import joblib -# A user could have installed joblib, sklearn, both, or neither. Further, only -# joblib >= 0.10.0 supports backends, so we also need to check for that. This -# bit of logic is to ensure that we create and register the backend for all -# viable installations of joblib. -joblib = sk_joblib = None -with ignoring(ImportError): - import joblib - if LooseVersion(joblib.__version__) < '0.10.2': - joblib = None -with ignoring(ImportError): - import sklearn.externals.joblib as sk_joblib - if LooseVersion(sk_joblib.__version__) < '0.10.2': - sk_joblib = None + with joblib.parallel_backend('dask'): + # your scikit-learn code -_bases = [] -if joblib: - from joblib.parallel import AutoBatchingMixin, ParallelBackendBase - _bases.append(ParallelBackendBase) -if sk_joblib: - from sklearn.externals.joblib.parallel import (AutoBatchingMixin, # noqa - ParallelBackendBase) - _bases.append(ParallelBackendBase) -if not _bases: - raise RuntimeError("Joblib backend requires either `joblib` >= '0.10.2' " - " or `sklearn` > '0.17.1'. Please install or upgrade") +See http://ml.dask.org/joblib.html for more information.""" -def is_weakrefable(obj): - try: - weakref.ref(obj) - return True - except TypeError: - return False - - -class _WeakKeyDictionary: - """A variant of weakref.WeakKeyDictionary for unhashable objects. - - This datastructure is used to store futures for broadcasted data objects - such as large numpy arrays or pandas dataframes that are not hashable and - therefore cannot be used as keys of traditional python dicts. - - Futhermore using a dict with id(array) as key is not safe because the - Python is likely to reuse id of recently collected arrays. - """ - - def __init__(self): - self._data = {} - - def __getitem__(self, obj): - ref, val = self._data[id(obj)] - if ref() is not obj: - # In case of a race condition with on_destroy. - raise KeyError(obj) - return val - - def __setitem__(self, obj, value): - key = id(obj) - try: - ref, _ = self._data[key] - if ref() is not obj: - # In case of race condition with on_destroy. - raise KeyError(obj) - except KeyError: - # Insert the new entry in the mapping along with a weakref - # callback to automatically delete the entry from the mapping - # as soon as the object used as key is garbage collected. - def on_destroy(_): - del self._data[key] - ref = weakref.ref(obj, on_destroy) - self._data[key] = ref, value - - def __len__(self): - return len(self._data) - - def clear(self): - self._data.clear() - - -def joblib_funcname(x): - try: - # Can't do isinstance, since joblib is often bundled in packages, and - # separate installs will have non-equivalent types. - if type(x).__name__ == 'BatchedCalls': - x = x.items[0][0] - except Exception: - pass - return funcname(x) - - -class Batch(object): - def __init__(self, tasks): - self.tasks = tasks - - def __call__(self, *data): - results = [] - for func, args, kwargs in self.tasks: - args = [a(data) if isinstance(a, itemgetter) else a - for a in args] - kwargs = {k: v(data) if isinstance(v, itemgetter) else v - for (k, v) in kwargs.items()} - results.append(func(*args, **kwargs)) - return results - - def __reduce__(self): - return (Batch, (self.tasks,)) - - -class DaskDistributedBackend(ParallelBackendBase, AutoBatchingMixin): - MIN_IDEAL_BATCH_DURATION = 0.2 - MAX_IDEAL_BATCH_DURATION = 1.0 - - def __init__(self, scheduler_host=None, scatter=None, - client=None, loop=None, **submit_kwargs): - if client is None: - if scheduler_host: - client = Client(scheduler_host, loop=loop, set_as_default=False) - else: - try: - client = get_client() - except ValueError: - msg = ("To use Joblib with Dask first create a Dask Client" - "\n\n" - " from dask.distributed import Client\n" - " client = Client()\n" - "or\n" - " client = Client('scheduler-address:8786')") - raise ValueError(msg) - - self.client = client - - if scatter is not None and not isinstance(scatter, (list, tuple)): - raise TypeError("scatter must be a list/tuple, got " - "`%s`" % type(scatter).__name__) - - if scatter is not None and len(scatter) > 0: - # Keep a reference to the scattered data to keep the ids the same - self._scatter = list(scatter) - scattered = self.client.scatter(scatter, broadcast=True) - self.data_futures = {id(x): f for x, f in zip(scatter, scattered)} - else: - self._scatter = [] - self.data_futures = {} - self.task_futures = set() - self.submit_kwargs = submit_kwargs - - def __reduce__(self): - return (DaskDistributedBackend, ()) - - def get_nested_backend(self): - return DaskDistributedBackend() - - def configure(self, n_jobs=1, parallel=None, **backend_args): - return self.effective_n_jobs(n_jobs) - - def start_call(self): - self.call_data_futures = _WeakKeyDictionary() - - def stop_call(self): - # The explicit call to clear is required to break a cycling reference - # to the futures. - self.call_data_futures.clear() - - def effective_n_jobs(self, n_jobs): - return sum(self.client.ncores().values()) - - def _to_func_args(self, func): - collected_futures = [] - itemgetters = dict() - - # Futures that are dynamically generated during a single call to - # Parallel.__call__. - call_data_futures = getattr(self, 'call_data_futures', None) - - def maybe_to_futures(args): - for arg in args: - arg_id = id(arg) - if arg_id in itemgetters: - yield itemgetters[arg_id] - continue - - f = None - if f is None and arg_id in self.data_futures: - f = self.data_futures[arg_id] - - elif f is None and call_data_futures is not None: - try: - f = call_data_futures[arg] - except KeyError: - if is_weakrefable(arg) and sizeof(arg) > 1e6: - # Automatically scatter large objects to some of - # the workers to avoid duplicated data transfers. - # Rely on automated inter-worker data stealing if - # more workers need to reuse this data concurrently - # beyond the initial broadcast arity. - [f] = self.client.scatter([arg], broadcast=3) - call_data_futures[arg] = f - - if f is not None: - getter = itemgetter(len(collected_futures)) - collected_futures.append(f) - itemgetters[arg_id] = getter - arg = getter - yield arg - - tasks = [] - for f, args, kwargs in func.items: - args = list(maybe_to_futures(args)) - kwargs = dict(zip(kwargs.keys(), maybe_to_futures(kwargs.values()))) - tasks.append((f, args, kwargs)) - - if not collected_futures: - return func, () - return Batch(tasks), collected_futures - - def apply_async(self, func, callback=None): - key = '%s-batch-%s' % (joblib_funcname(func), uuid4().hex) - func, args = self._to_func_args(func) - - future = self.client.submit(func, *args, key=key, **self.submit_kwargs) - self.task_futures.add(future) - - @gen.coroutine - def callback_wrapper(): - result = yield _wait([future]) - self.task_futures.remove(future) - if callback is not None: - callback(result) # gets called in separate thread - - self.client.loop.add_callback(callback_wrapper) - - ref = weakref.ref(future) # avoid reference cycle - - def get(): - return ref().result() - - future.get = get # monkey patch to achieve AsyncResult API - return future - - def abort_everything(self, ensure_ready=True): - """ Tell the client to cancel any task submitted via this instance - - joblib.Parallel will never access those results - """ - self.client.cancel(self.task_futures) - self.task_futures.clear() - - @contextlib.contextmanager - def retrieval_context(self): - """Override ParallelBackendBase.retrieval_context to avoid deadlocks. - - This removes thread from the worker's thread pool (using 'secede'). - Seceding avoids deadlock in nested parallelism settings. - """ - # See 'joblib.Parallel.__call__' and 'joblib.Parallel.retrieve' for how - # this is used. - if hasattr(thread_state, 'execution_state'): - # we are in a worker. Secede to avoid deadlock. - secede() - - yield - - if hasattr(thread_state, 'execution_state'): - rejoin() - - -for base in _bases: - base.register(DaskDistributedBackend) - - -DistributedBackend = DaskDistributedBackend - - -# Register the backend with any available versions of joblib -if joblib: - joblib.register_parallel_backend('dask', DaskDistributedBackend) - joblib.register_parallel_backend('distributed', DaskDistributedBackend) - joblib.register_parallel_backend('dask.distributed', DaskDistributedBackend) -if sk_joblib: - sk_joblib.register_parallel_backend('dask', DaskDistributedBackend) - sk_joblib.register_parallel_backend('distributed', DaskDistributedBackend) - sk_joblib.register_parallel_backend('dask.distributed', DaskDistributedBackend) +raise ImportError(msg) diff --git a/distributed/tests/test_joblib.py b/distributed/tests/test_joblib.py deleted file mode 100644 index 8eec12899d6..00000000000 --- a/distributed/tests/test_joblib.py +++ /dev/null @@ -1,295 +0,0 @@ -from __future__ import print_function, division, absolute_import -import os -import importlib -from distutils.version import LooseVersion - -import pytest -from random import random -from time import sleep - -from distributed import Client -from distributed.metrics import time -from distributed.utils_test import cluster, inc -from distributed.utils_test import loop, client, cluster_fixture, s, a, b # noqa F401 -from toolz import identity - -distributed_joblib = pytest.importorskip('distributed.joblib') -joblib_funcname = distributed_joblib.joblib_funcname - - -@pytest.fixture(params=['joblib', 'sk_joblib']) -def joblib(request): - if request.param == 'joblib': - try: - this_joblib = importlib.import_module('joblib') - except ImportError: - pytest.skip("joblib not available") - else: - try: - this_joblib = importlib.import_module("sklearn.externals.joblib") - except ImportError: - pytest.skip("sklearn.externals.joblib not available") - - return this_joblib - - -def slow_raise_value_error(condition, duration=0.05): - sleep(duration) - if condition: - raise ValueError("condition evaluated to True") - - -def test_simple(client, joblib): - Parallel = joblib.Parallel - delayed = joblib.delayed - - with joblib.parallel_backend('dask') as (ba, _): - seq = Parallel()(delayed(inc)(i) for i in range(10)) - assert seq == [inc(i) for i in range(10)] - - with pytest.raises(ValueError): - Parallel()(delayed(slow_raise_value_error)(i == 3) - for i in range(10)) - - seq = Parallel()(delayed(inc)(i) for i in range(10)) - assert seq == [inc(i) for i in range(10)] - - -def random2(): - return random() - - -def test_dont_assume_function_purity(client, joblib): - Parallel = joblib.Parallel - delayed = joblib.delayed - - with joblib.parallel_backend('dask') as (ba, _): - x, y = Parallel()(delayed(random2)() for i in range(2)) - assert x != y - - -def test_joblib_funcname(client, joblib): - Parallel = joblib.Parallel - delayed = joblib.delayed - - with joblib.parallel_backend('dask') as (ba, _): - x, y = Parallel()(delayed(inc)(i) for i in range(2)) - - def f(dask_scheduler): - return list(dask_scheduler.transition_log) - log = client.run_on_scheduler(f) - assert all(tup[0].startswith('inc-batch') for tup in log) - - -def test_joblib_backend_subclass(joblib): - assert issubclass(distributed_joblib.DaskDistributedBackend, - joblib.parallel.ParallelBackendBase) - - -def add5(a, b, c, d=0, e=0): - return a + b + c + d + e - - -class CountSerialized(object): - def __init__(self, x): - self.x = x - self.count = 0 - - def __add__(self, other): - return self.x + getattr(other, 'x', other) - - __radd__ = __add__ - - def __reduce__(self): - self.count += 1 - return (CountSerialized, (self.x,)) - - -def test_joblib_scatter(client, joblib): - Parallel = joblib.Parallel - delayed = joblib.delayed - - x = CountSerialized(1) - y = CountSerialized(2) - z = CountSerialized(3) - - with joblib.parallel_backend('dask', scatter=[x, y]) as (ba, _): - f = delayed(add5) - tasks = [f(x, y, z, d=4, e=5), - f(x, z, y, d=5, e=4), - f(y, x, z, d=x, e=5), - f(z, z, x, d=z, e=y)] - sols = [func(*args, **kwargs) for func, args, kwargs in tasks] - results = Parallel()(tasks) - - # Scatter must take a list/tuple - with pytest.raises(TypeError): - with joblib.parallel_backend('dask', loop=loop, - scatter=1): - pass - - for l, r in zip(sols, results): - assert l == r - - # Scattered variables only serialized once - assert x.count == 1 - assert y.count == 1 - assert z.count == 4 - - -def test_nested_backend_context_manager(loop, joblib): - if LooseVersion(joblib.__version__) <= "0.11.0": - pytest.skip("Joblib >= 0.11.1 required for nested parallelism.") - Parallel = joblib.Parallel - delayed = joblib.delayed - - def get_nested_pids(): - pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))) - pids |= set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))) - return pids - - with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: - with joblib.parallel_backend('dask') as (ba, _): - pid_groups = Parallel(n_jobs=2)( - delayed(get_nested_pids, check_pickle=False)() - for _ in range(10) - ) - for pid_group in pid_groups: - assert len(set(pid_group)) <= 2 - - # No deadlocks - with Client(s['address'], loop=loop) as client: - with joblib.parallel_backend('dask') as (ba, _): - pid_groups = Parallel(n_jobs=2)( - delayed(get_nested_pids, check_pickle=False)() - for _ in range(10) - ) - for pid_group in pid_groups: - assert len(set(pid_group)) <= 2 - - -def test_errors(loop, joblib): - with pytest.raises(ValueError) as info: - with joblib.parallel_backend('dask'): - pass - - assert "create a dask client" in str(info.value).lower() - - -def test_correct_nested_backend(client, joblib): - if LooseVersion(joblib.__version__) <= LooseVersion("0.11.0"): - pytest.skip("Requires nested parallelism") - - # No requirement, should be us - with joblib.parallel_backend('dask') as (ba, _): - result = joblib.Parallel(n_jobs=2)(joblib.delayed(outer)( - joblib, nested_require=None) for _ in range(1)) - assert isinstance(result[0][0][0], - distributed_joblib.DaskDistributedBackend) - - # Require threads, should be threading - with joblib.parallel_backend('dask') as (ba, _): - result = joblib.Parallel(n_jobs=2)(joblib.delayed(outer)( - joblib, nested_require='sharedmem') for _ in range(1)) - assert isinstance(result[0][0][0], - joblib.parallel.ThreadingBackend) - - -def outer(joblib, nested_require): - return joblib.Parallel(n_jobs=2, prefer='threads')( - joblib.delayed(middle)(joblib, nested_require) for _ in range(1) - ) - - -def middle(joblib, require): - return joblib.Parallel(n_jobs=2, require=require)( - joblib.delayed(inner)(joblib) for _ in range(1) - ) - - -def inner(joblib): - return joblib.parallel.Parallel()._backend - - -def test_secede_with_no_processes(loop, joblib): - # https://github.com/dask/distributed/issues/1775 - - with Client(loop=loop, processes=False, set_as_default=True): - with joblib.parallel_backend('dask'): - joblib.Parallel(n_jobs=4)(joblib.delayed(identity)(i) for i in range(2)) - - -def _test_keywords_f(_): - from distributed import get_worker - return get_worker().address - - -def test_keywords(client, joblib, s, a, b): - with joblib.parallel_backend('dask', workers=a['address']) as (ba, _): - seq = joblib.Parallel()(joblib.delayed(_test_keywords_f)(i) for i in range(10)) - assert seq == [a['address']] * 10 - - with joblib.parallel_backend('dask', workers=b['address']) as (ba, _): - seq = joblib.Parallel()(joblib.delayed(_test_keywords_f)(i) for i in range(10)) - assert seq == [b['address']] * 10 - - -def test_cleanup(loop, joblib): - with Client(processes=False, loop=loop) as client: - with joblib.parallel_backend('dask'): - joblib.Parallel()(joblib.delayed(inc)(i) for i in range(10)) - - start = time() - while client.cluster.scheduler.tasks: - sleep(0.01) - assert time() < start + 5 - - assert not client.futures - - -def test_auto_scatter(loop, joblib): - base_type = joblib._parallel_backends.ParallelBackendBase - if not hasattr(base_type, 'start_call'): - raise pytest.skip('joblib version does not support backend callbacks') - - np = pytest.importorskip('numpy') - data = np.ones(int(1e7), dtype=np.uint8) - - Parallel = joblib.Parallel - delayed = joblib.delayed - - def noop(*args, **kwargs): - pass - - def count_events(event_name, client): - worker_events = client.run(lambda dask_worker: dask_worker.log) - event_counts = {} - for w, events in worker_events.items(): - event_counts[w] = len([event for event in list(events) - if event[1] == event_name]) - return event_counts - - with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: - with joblib.parallel_backend('dask') as (ba, _): - # Passing the same data as arg and kwarg triggers a single - # scatter operation whose result is reused. - Parallel()(delayed(noop)(data, data, i, opt=data) - for i in range(5)) - # By default large array are automatically scattered with - # broadcast=3 which means that each worker can directly receive - # the data from the scatter operation once. - counts = count_events('receive-from-scatter', client) - assert counts[a['address']] == 1 - assert counts[b['address']] == 1 - - with cluster() as (s, [a, b]): - with Client(s['address'], loop=loop) as client: - with joblib.parallel_backend('dask') as (ba, _): - Parallel()(delayed(noop)(data[:3], i) for i in range(5)) - # Small arrays are passed within the task definition without going - # through a scatter operation. - counts = count_events('receive-from-scatter', client) - assert counts[a['address']] == 0 - assert counts[b['address']] == 0 From 61ebf5ee8782e64f5085ee4903beaa8c52b09c7c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sun, 7 Oct 2018 18:25:09 -0400 Subject: [PATCH 2/3] err nicely on response['status'] keyerror --- distributed/worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 06b38a246c0..9154a268de2 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2919,8 +2919,13 @@ def get_data_from_worker(rpc, keys, worker, who=None, max_connections=None, deserializers=deserializers, op='get_data', keys=keys, who=who, max_connections=max_connections) - if response['status'] == 'OK': - yield comm.write('OK') + try: + status = response['status'] + except KeyError: + raise ValueError("Unexpected response", response) + else: + if status == 'OK': + yield comm.write('OK') finally: rpc.reuse(worker, comm) From 80224c16e859553767b72dd9e79d1554003d05c1 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sun, 7 Oct 2018 18:27:38 -0400 Subject: [PATCH 3/3] skip intermittent test on windows --- distributed/tests/test_resources.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 2d5b1dab8b7..5fa8b1c8a34 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -8,6 +8,7 @@ from distributed import Worker from distributed.client import wait +from distributed.compatibility import WINDOWS from distributed.utils import tokey from distributed.utils_test import (inc, gen_cluster, slowinc, slowadd) from distributed.utils_test import client, cluster_fixture, loop, s, a, b # noqa: F401 @@ -276,7 +277,7 @@ def test_full_collections(c, s, a, b): @pytest.mark.parametrize('optimize_graph', [ pytest.mark.xfail(True, reason="don't track resources through optimization"), - False + pytest.mark.skipif(WINDOWS, False, reason="intermittent failure"), ]) def test_collections_get(client, optimize_graph, s, a, b): da = pytest.importorskip('dask.array')