From 3421866c63d2b323f6aa4c66b960f5ceccb11351 Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Thu, 30 May 2019 11:58:48 +0200 Subject: [PATCH 1/4] Fix the resource key representation before sending graphs Convert resource key toples to a string representation before they are submitted to the scheduler. The commit is intended to fix #2716. --- distributed/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/client.py b/distributed/client.py index d924b608c61..22d89cda4e4 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2351,6 +2351,7 @@ def _graph_to_futures( resources = self._expand_resources( resources, all_keys=itertools.chain(dsk, keys) ) + resources = {tokey(k): v for k, v in resources.items()} if retries: retries = self._expand_retries( From 87fe6c371614b6b14fc68d61b3c54bcdbbde505f Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Mon, 3 Jun 2019 11:22:45 +0200 Subject: [PATCH 2/4] Adds a test case triggering the resource restriction issue #2716 The test case persists the result of a tiny DataFrame operation and checks the resource restrictions. --- distributed/tests/test_resources.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index d7102ef5301..3b68d118634 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -202,6 +202,23 @@ def test_persist_tuple(c, s, a, b): assert not b.data +@gen_cluster(client=True) +def test_resources_str(c, s, a, b): + pd = pytest.importorskip("pandas") + dd = pytest.importorskip("dask.dataframe") + + a.set_resources({"MyRes": 1}) + x = dd.from_pandas(pd.DataFrame({"A": [1, 2], "B": [3, 4]}), npartitions=1) + y = x.apply(lambda row: row.sum(), axis=1, meta=(None, "int64")) + yy = y.persist(resources={"MyRes": 1}) + yield wait(yy) + + ts_first = s.tasks[tokey(y.__dask_keys__()[0])] + assert ts_first.resource_restrictions == {"MyRes": 1} + ts_last = s.tasks[tokey(y.__dask_keys__()[-1])] + assert ts_last.resource_restrictions == {"MyRes": 1} + + @gen_cluster( client=True, ncores=[ From 4f3b94332891ccdec37e436dac54c64e5505ea3d Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Mon, 3 Jun 2019 12:50:08 +0200 Subject: [PATCH 3/4] Fixes the worker resource assignment in a test case --- distributed/tests/test_resources.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 3b68d118634..480532d912e 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -207,7 +207,8 @@ def test_resources_str(c, s, a, b): pd = pytest.importorskip("pandas") dd = pytest.importorskip("dask.dataframe") - a.set_resources({"MyRes": 1}) + yield a.set_resources(MyRes=1) + x = dd.from_pandas(pd.DataFrame({"A": [1, 2], "B": [3, 4]}), npartitions=1) y = x.apply(lambda row: row.sum(), axis=1, meta=(None, "int64")) yy = y.persist(resources={"MyRes": 1}) From 2c90168a2993affee192f5d28095c2dcf6cef61e Mon Sep 17 00:00:00 2001 From: Michael Spiegel Date: Thu, 30 May 2019 11:58:48 +0200 Subject: [PATCH 4/4] Fix the resource key representation before sending graphs Convert resource key toples to a string representation before they are submitted to the scheduler. The commit is intended to fix #2716. --- distributed/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/client.py b/distributed/client.py index d924b608c61..22d89cda4e4 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2351,6 +2351,7 @@ def _graph_to_futures( resources = self._expand_resources( resources, all_keys=itertools.chain(dsk, keys) ) + resources = {tokey(k): v for k, v in resources.items()} if retries: retries = self._expand_retries(