Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1542,11 +1542,6 @@ async def _connect(self, addr: str, timeout: float | None = None) -> Comm:
raise
finally:
self._connecting_count -= 1
except asyncio.CancelledError:
current_task = asyncio.current_task()
assert current_task
reason = self._reasons.pop(current_task, "ConnectionPool closing.")
raise CommClosedError(reason)
finally:
self._pending_count -= 1

Expand Down Expand Up @@ -1599,12 +1594,13 @@ def callback(task: asyncio.Task[Comm]) -> None:
except asyncio.CancelledError:
# This is an outside cancel attempt
connect_attempt.cancel()
try:
await connect_attempt
except CommClosedError:
pass
await connect_attempt
raise
return await connect_attempt
try:
return connect_attempt.result()
except asyncio.CancelledError:
reason = self._reasons.pop(connect_attempt, "ConnectionPool closing.")
raise CommClosedError(reason)
Comment on lines +1599 to +1603

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graingert I'm all ears if you know how to better test this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got test_remove_cancels_connect_before_task_running which appears to produce this condition 🎉 (not a perfect test but it's something)


def reuse(self, addr: str, comm: Comm) -> None:
"""
Expand Down
24 changes: 24 additions & 0 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,30 @@ async def remove_address():
assert connect_finished.cancelled()


@gen_test()
async def test_remove_cancels_connect_before_task_running():
loop = asyncio.get_running_loop()
connect_finished = loop.create_future()

async def connect(*args, **kwargs):
await connect_finished
Comment thread
crusaderky marked this conversation as resolved.

async def connect_to_server():
with pytest.raises(CommClosedError, match="Address removed."):
await rpc.connect("tcp://0.0.0.0")
return True

rpc = await ConnectionPool(limit=1)
with mock.patch("distributed.core.connect", connect):
t1 = asyncio.create_task(connect_to_server())
# Cancel the actual connect task before it can even run
while not rpc._connecting:
await asyncio.sleep(0)
rpc.remove("tcp://0.0.0.0")

assert await t1


@gen_test()
async def test_connection_pool_respects_limit():
limit = 5
Expand Down
39 changes: 0 additions & 39 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3326,52 +3326,13 @@ async def test_gather_dep_no_longer_in_flight_tasks(c, s, a):
assert not any("missing-dep" in msg for msg in f2_story)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_gather_dep_cancelled_error(c, s, a):
"""Something somewhere in the networking stack raises CancelledError while
gather_dep is running

See Also
--------
test_get_data_cancelled_error
https://github.com/dask/distributed/issues/8006
"""
async with BlockedGetData(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[b.address])
y = c.submit(inc, x, key="y", workers=[a.address])
await b.in_get_data.wait()
tasks = {
task for task in asyncio.all_tasks() if "gather_dep" in task.get_name()
}
assert tasks
# There should be only one task but cope with finding more just in case a
# previous test didn't properly clean up
for task in tasks:
task.cancel()

b.block_get_data.set()
assert await y == 3

assert_story(
a.state.story("x"),
[
("x", "fetch", "flight", "flight", {}),
("x", "flight", "missing", "missing", {}),
("x", "missing", "fetch", "fetch", {}),
("x", "fetch", "flight", "flight", {}),
("x", "flight", "memory", "memory", {"y": "ready"}),
],
)
Comment on lines -3329 to -3364

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is too artificial. It trigger a condition that is only possible during worker shutdown. Nothing is ever cancelling the actual gather_dep task itself unless we're shutting down the worker.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other PR is introducing also test_get_data_cancelled_error which is, I believe, a much more represenative test (which also works on this PR).

The test is provoking a (still wrong and artifical) CancelledError on write. This is wrong and will never happen, I believe. However, due to our low level handshakes, this CancelledError cancels the actual connect and is therefore kind of representative of the real issue



@gen_cluster(client=True, nthreads=[("", 1)], timeout=5)
async def test_get_data_cancelled_error(c, s, a):
"""Something somewhere in the networking stack raises CancelledError while
get_data is running

See Also
--------
test_gather_dep_cancelled_error
https://github.com/dask/distributed/issues/8006
"""

Expand Down
6 changes: 1 addition & 5 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,11 +2112,7 @@ async def gather_dep(
data=response["data"],
stimulus_id=f"gather-dep-success-{time()}",
)

# Note: CancelledError and asyncio.TimeoutError are rare conditions
# that can be raised by the network stack.
# See https://github.com/dask/distributed/issues/8006
except (OSError, asyncio.CancelledError, asyncio.TimeoutError):
except OSError:
logger.exception("Worker stream died during communication: %s", worker)
self.state.log.append(
("gather-dep-failed", worker, to_gather, stimulus_id, time())
Expand Down