From 42d17ebcd4face307e3b59c14ef04fa0008dae30 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 17 Nov 2020 20:41:54 +0100 Subject: [PATCH 1/5] __dask_distributed_pack__() now takes a client argument --- distributed/client.py | 2 +- distributed/protocol/highlevelgraph.py | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index bbda46acf4a..5ce872c0641 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2581,7 +2581,7 @@ def _graph_to_futures( if not isinstance(dsk, HighLevelGraph): dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=()) - dsk = highlevelgraph_pack(dsk, keyset, self, self.futures) + dsk = highlevelgraph_pack(dsk, self, keyset) if isinstance(retries, Number) and retries > 0: retries = {k: retries for k in dsk} diff --git a/distributed/protocol/highlevelgraph.py b/distributed/protocol/highlevelgraph.py index 77de669ebba..b503986421e 100644 --- a/distributed/protocol/highlevelgraph.py +++ b/distributed/protocol/highlevelgraph.py @@ -27,9 +27,8 @@ def _materialized_layer_pack( layer: Layer, all_keys, known_key_dependencies, + client, client_keys, - allowed_client, - allowed_futures, ): from ..client import Future @@ -47,11 +46,11 @@ def _materialized_layer_pack( dsk = {k: unpack_remotedata(v, byte_keys=True) for k, v in layer.items()} unpacked_futures = set.union(*[v[1] for v in dsk.values()]) if dsk else set() for future in unpacked_futures: - if future.client is not allowed_client: + if future.client is not client: raise ValueError( "Inputs contain futures that were created by another client." ) - if tokey(future.key) not in allowed_futures: + if tokey(future.key) not in client.futures: raise CancelledError(tokey(future.key)) unpacked_futures_deps = {} for k, v in dsk.items(): @@ -76,9 +75,7 @@ def _materialized_layer_pack( return {"dsk": dsk, "dependencies": dependencies} -def highlevelgraph_pack( - hlg: HighLevelGraph, client_keys, allowed_client, allowed_futures -): +def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys): layers = [] # Dump each layer (in topological order) @@ -104,9 +101,8 @@ def highlevelgraph_pack( layer, hlg.get_all_external_keys(), hlg.key_dependencies, + client, client_keys, - allowed_client, - allowed_futures, ), } ) From 32e356fff806e7ba898840c0eb8bf32ebe7418da Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 17 Nov 2020 14:25:52 -0600 Subject: [PATCH 2/5] Temporarily use corresponding dask PR --- continuous_integration/environment-windows.yml | 2 +- continuous_integration/travis/install.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/continuous_integration/environment-windows.yml b/continuous_integration/environment-windows.yml index 5bf1c4903d1..b4eb7f1618a 100644 --- a/continuous_integration/environment-windows.yml +++ b/continuous_integration/environment-windows.yml @@ -29,6 +29,6 @@ dependencies: - fsspec - pip - pip: - - git+https://github.com/dask/dask + - git+https://github.com/madsbk/dask.git@dask_distributed_pack_client_arg - git+https://github.com/joblib/joblib.git - git+https://github.com/dask/zict diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh index d7a54093ce0..0832a3b28e1 100644 --- a/continuous_integration/travis/install.sh +++ b/continuous_integration/travis/install.sh @@ -87,7 +87,7 @@ if [[ $PYTHON != 3.8 ]]; then conda install --no-deps -c conda-forge -c defaults -c numba stacktrace fi -python -m pip install -q git+https://github.com/dask/dask.git --upgrade --no-deps +python -m pip install -q git+https://github.com/madsbk/dask.git@dask_distributed_pack_client_arg --upgrade --no-deps python -m pip install -q git+https://github.com/joblib/joblib.git --upgrade --no-deps python -m pip install -q git+https://github.com/intake/filesystem_spec.git --upgrade --no-deps python -m pip install -q git+https://github.com/dask/s3fs.git --upgrade --no-deps From a908bf5edb0737127fe5b028496cc1a20e031b47 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 17 Nov 2020 15:04:45 -0600 Subject: [PATCH 3/5] Pass client to layer.__dask_distributed_pack__ --- distributed/protocol/highlevelgraph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/protocol/highlevelgraph.py b/distributed/protocol/highlevelgraph.py index b503986421e..512a7abd4b4 100644 --- a/distributed/protocol/highlevelgraph.py +++ b/distributed/protocol/highlevelgraph.py @@ -81,7 +81,7 @@ def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys): # Dump each layer (in topological order) for layer in (hlg.layers[name] for name in hlg._toposort_layers()): if not layer.is_materialized(): - state = layer.__dask_distributed_pack__() + state = layer.__dask_distributed_pack__(client) if state is not None: layers.append( { From 099aed7fc5c11298c32d9b5e537e78bc4701a392 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 17 Nov 2020 15:57:17 -0600 Subject: [PATCH 4/5] Trigger CI From 8124af8e1e31a04db7e26b8b4cdb262055ae0749 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 17 Nov 2020 21:18:10 -0600 Subject: [PATCH 5/5] Use mainline dask again --- continuous_integration/environment-windows.yml | 2 +- continuous_integration/travis/install.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/continuous_integration/environment-windows.yml b/continuous_integration/environment-windows.yml index b4eb7f1618a..5bf1c4903d1 100644 --- a/continuous_integration/environment-windows.yml +++ b/continuous_integration/environment-windows.yml @@ -29,6 +29,6 @@ dependencies: - fsspec - pip - pip: - - git+https://github.com/madsbk/dask.git@dask_distributed_pack_client_arg + - git+https://github.com/dask/dask - git+https://github.com/joblib/joblib.git - git+https://github.com/dask/zict diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh index 0832a3b28e1..d7a54093ce0 100644 --- a/continuous_integration/travis/install.sh +++ b/continuous_integration/travis/install.sh @@ -87,7 +87,7 @@ if [[ $PYTHON != 3.8 ]]; then conda install --no-deps -c conda-forge -c defaults -c numba stacktrace fi -python -m pip install -q git+https://github.com/madsbk/dask.git@dask_distributed_pack_client_arg --upgrade --no-deps +python -m pip install -q git+https://github.com/dask/dask.git --upgrade --no-deps python -m pip install -q git+https://github.com/joblib/joblib.git --upgrade --no-deps python -m pip install -q git+https://github.com/intake/filesystem_spec.git --upgrade --no-deps python -m pip install -q git+https://github.com/dask/s3fs.git --upgrade --no-deps