____________________________ test_compute_per_key _____________________________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:60697', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:60698', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:60700', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_compute_per_key(c, s, a, b):
mbk = ComputePerKey(s)
da = pytest.importorskip("dask.array")
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)
await x
y = await dask.delayed(inc)(1).persist()
z = (x + x.T) - x.mean(axis=0)
await c.compute(z.sum())
mbk.update()
http_client = AsyncHTTPClient()
response = await http_client.fetch(
"http://localhost:%d/individual-compute-time-per-key" % s.http_server.port
)
assert response.code == 200
> assert ("sum-aggregate") in mbk.compute_source.data["names"]
E AssertionError: assert 'sum-aggregate' in ['add', 'mean_chunk', 'sum', 'mean_agg-aggregate', 'ones_like']
e.g. https://github.com/dask/distributed/actions/runs/3153529773/jobs/5130108657 and https://github.com/dask/distributed/actions/runs/3241463745/jobs/5313437018
Backtrace:
Pretty sure this failed assertion "correct" in that
z.sum()is dropped after the call toc.compute()completes, so there's no need for the workers to keep it live.