Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,14 @@ async def _handle_stream(self, stream, address):
if stream is None:
# Preparation failed
return
if self.tcp_server is None:
# stop() was called after the connection was accepted, but before this
# method could run. abort_handshaking_comms() has already run and won't
# take care of this comm; if we left the stream dangling, the client
# would hang forever in the comm handshake, which is deliberately not
# subject to timeouts (see distributed.comm.core.connect()).
stream.close()
return

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional commentary by Claude:

If Listener.stop() runs after a connection has been accepted but before
_handle_stream gets to run, _handle_stream would crash with
ValueError('invalid operation on non-started TCPListener') when reading
self.contact_address, abandoning the accepted stream without closing it.

Since the comm handshake in connect() is deliberately not subject to the
connect timeout (#7698), the client would then hang forever waiting for the
server's handshake reply. abort_handshaking_comms() cannot help, as the comm
never reached on_connection().

This is what deadlocks test_RetireWorker_stress: a worker's gather_dep from a
closing worker gets stuck forever in the handshake, pinning the key in flight
state; the AMM RetireWorker policy then re-suggests the same transfer every
interval, but the recipient ignores it because the key is already in flight,
so retire_workers never completes. It also leaked one file descriptor (the
abandoned socket) every time the race hit without deadlocking.

logger.debug("Incoming connection from %r to %r", address, self.contact_address)
local_address = self.prefix + get_stream_address(stream)
comm = self.comm_class(stream, local_address, address, self.deserialize)
Expand Down
20 changes: 20 additions & 0 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,26 @@ def get_connector(self):
listener.stop()


@gen_test()
async def test_stop_listener_during_handle_stream(tcp):
"""The listener is stopped after a connection has been accepted, but before the
server could start handling it. The accepted stream must be closed, so that the
client fails fast instead of hanging forever in the comm handshake, which is
deliberately not subject to the connect timeout (see test_handshake_slow_comm).
"""
listener = await listen("tcp://127.0.0.1", echo)
orig_handle_stream = listener._handle_stream

async def stop_then_handle_stream(stream, address):
listener.stop()
await orig_handle_stream(stream, address)

listener.tcp_server.handle_stream = stop_then_handle_stream

with pytest.raises(CommClosedError):
await wait_for(connect(listener.contact_address), timeout=5)


async def check_connect_timeout(addr):
t1 = time()
with pytest.raises(IOError):
Expand Down
15 changes: 13 additions & 2 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,9 +1296,19 @@ async def tensordot_stress(c, s):
break
else:
raise RuntimeError("Expected 'update_graph' event not found")
# Test that we didn't recompute any tasks during the stress test
# Test that we didn't recompute any tasks during the stress test.
# Exception: when a worker is retired, any tasks that completed on it between the
# moment the AMM RetireWorker policy measured that no unique keys were left on it
# and the moment the worker was actually removed are lost and will be recomputed
# elsewhere (see RetireWorker.done).
await async_poll_for(lambda: not s.tasks)
assert sum(t.start == "memory" for t in s.transition_log) == expected_tasks
lost = sum(
len(msg["lost-computed-tasks"])
for _, msg in await c.get_events("all")
if msg["action"] == "remove-worker" and msg["expected"]
)
actual = sum(t.start == "memory" for t in s.transition_log)
assert expected_tasks <= actual <= expected_tasks + lost

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional commentary by Claude:

Tasks that complete on a retiring worker between the moment the AMM
RetireWorker policy observes that no unique keys are left on it and the moment
the worker is removed are lost and recomputed elsewhere. This is a documented
design decision (see RetireWorker.done), so the stress test must not treat
these recomputes as failures. Count them through the remove-worker events and
loosen the transition_log assertion accordingly.

This change fixes the rare assert 1641 == 1638 flavour of CI failures, distinct from the
deadlock fixed in the previous commit.



@pytest.mark.slow
Expand Down Expand Up @@ -1372,6 +1382,7 @@ async def test_ReduceReplicas_stress(c, s, *workers):
},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
timeout=180, # Normally runs in ~5s, but has been observed to take up to 48s
)
async def test_RetireWorker_stress(c, s, *workers, use_ReduceReplicas):
"""It is safe to retire the best part of a cluster in the middle of a computation"""
Expand Down
Loading