diff --git a/distributed/client.py b/distributed/client.py index d924b608c61..22d89cda4e4 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -2351,6 +2351,7 @@ def _graph_to_futures( resources = self._expand_resources( resources, all_keys=itertools.chain(dsk, keys) ) + resources = {tokey(k): v for k, v in resources.items()} if retries: retries = self._expand_retries( diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index d7102ef5301..480532d912e 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -202,6 +202,24 @@ def test_persist_tuple(c, s, a, b): assert not b.data +@gen_cluster(client=True) +def test_resources_str(c, s, a, b): + pd = pytest.importorskip("pandas") + dd = pytest.importorskip("dask.dataframe") + + yield a.set_resources(MyRes=1) + + x = dd.from_pandas(pd.DataFrame({"A": [1, 2], "B": [3, 4]}), npartitions=1) + y = x.apply(lambda row: row.sum(), axis=1, meta=(None, "int64")) + yy = y.persist(resources={"MyRes": 1}) + yield wait(yy) + + ts_first = s.tasks[tokey(y.__dask_keys__()[0])] + assert ts_first.resource_restrictions == {"MyRes": 1} + ts_last = s.tasks[tokey(y.__dask_keys__()[-1])] + assert ts_last.resource_restrictions == {"MyRes": 1} + + @gen_cluster( client=True, ncores=[