Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion livekit-agents/livekit/agents/ipc/job_thread_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ async def aclose(self) -> None:
logger.error("job did not ack shutdown in time", extra=self.logging_extra())

if not self._shutting_down_fut.done():
await self._shutting_down_fut
try:
await asyncio.wait_for(self._shutting_down_fut, timeout=self._opts.close_timeout)
except asyncio.TimeoutError:
logger.error("job did not send ShuttingDown in time", extra=self.logging_extra())

try:
if self._main_atask:
Expand Down Expand Up @@ -327,6 +330,8 @@ async def _monitor_task(self) -> None:
# resolve pending futures when the channel closes
if not self._shutdown_ack_fut.done():
self._shutdown_ack_fut.set_result(None)
if not self._shutting_down_fut.done():
self._shutting_down_fut.set_result(None)

@utils.log_exceptions(logger=logger)
async def _ping_task(self) -> None:
Expand Down
10 changes: 9 additions & 1 deletion livekit-agents/livekit/agents/ipc/supervised_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,15 @@ async def aclose(self) -> None:
await self._send_kill_signal()

if not self._shutting_down_fut.done():
await self._shutting_down_fut
try:
await asyncio.wait_for(self._shutting_down_fut, timeout=self._opts.close_timeout)
except asyncio.TimeoutError:
logger.error(
"process did not send ShuttingDown in time, killing process",
extra=self.logging_extra(),
)
await self._send_dump_signal()
await self._send_kill_signal()

if self._supervise_atask and not self._supervise_atask.done():
try:
Expand Down
265 changes: 265 additions & 0 deletions tests/test_drain_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from livekit.agents.cli.cli import _ExitCli, _run_worker
from livekit.agents.cli.proto import CliArgs
from livekit.agents.ipc.job_thread_executor import ThreadJobExecutor
from livekit.agents.ipc.supervised_proc import SupervisedProc
from livekit.agents.utils import aio
from livekit.agents.worker import AgentServer
Expand Down Expand Up @@ -200,3 +201,267 @@ async def _fake_sleep(_: float) -> None:
asyncio.get_event_loop().run_until_complete(proc._memory_monitor_task())

mock_exception.assert_not_called()


class TestSupervisedProcAcloseDeadlock:
"""Verify that aclose() does not deadlock when _shutting_down_fut never resolves.

In livekit-agents 1.5.2+ (PR #4580), aclose() awaits _shutting_down_fut with
no timeout after the child acks shutdown. If the child gets stuck (e.g.
session.aclose() hangs), the parent blocks forever — orphaning the worker.

The fix wraps that await with wait_for(close_timeout).
"""

def test_aclose_does_not_deadlock_on_shutting_down_fut(self) -> None:
"""aclose() must complete (not hang) when _shutting_down_fut never resolves."""
proc = _make_supervised_proc()
proc._closing = True
proc._pch = AsyncMock() # type: ignore[assignment]
proc._shutdown_ack_fut.set_result(None)
# _shutting_down_fut intentionally left unresolved (child is stuck)

supervise_done: asyncio.Future[None] = asyncio.Future()

async def fake_supervise() -> None:
await supervise_done

proc._supervise_atask = asyncio.ensure_future(fake_supervise())

async def mock_kill() -> None:
if not supervise_done.done():
supervise_done.set_result(None)

async def run_aclose() -> None:
with (
patch(
"livekit.agents.ipc.supervised_proc.channel.asend_message",
new_callable=AsyncMock,
),
patch.object(proc, "_send_kill_signal", side_effect=mock_kill),
patch.object(proc, "_send_dump_signal", new_callable=AsyncMock),
):
await proc.aclose()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=10.0))

def test_aclose_kills_process_when_shutting_down_times_out(self) -> None:
"""When _shutting_down_fut times out, aclose must send kill signal."""
proc = _make_supervised_proc()
proc._closing = True
proc._pch = AsyncMock() # type: ignore[assignment]
proc._shutdown_ack_fut.set_result(None)
# _shutting_down_fut intentionally left unresolved

kill_call_count = 0
supervise_done: asyncio.Future[None] = asyncio.Future()

async def fake_supervise() -> None:
await supervise_done

proc._supervise_atask = asyncio.ensure_future(fake_supervise())

async def mock_kill() -> None:
nonlocal kill_call_count
kill_call_count += 1
if not supervise_done.done():
supervise_done.set_result(None)

async def run_aclose() -> None:
with (
patch(
"livekit.agents.ipc.supervised_proc.channel.asend_message",
new_callable=AsyncMock,
),
patch.object(proc, "_send_kill_signal", side_effect=mock_kill),
patch.object(proc, "_send_dump_signal", new_callable=AsyncMock),
):
await proc.aclose()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=10.0))

assert kill_call_count >= 1, (
"Expected _send_kill_signal to be called after shutting_down timeout"
)

def test_aclose_proceeds_normally_when_shutting_down_resolves(self) -> None:
"""When _shutting_down_fut resolves promptly, no kill signal is sent."""
proc = _make_supervised_proc()
proc._closing = True
proc._pch = AsyncMock() # type: ignore[assignment]
proc._shutdown_ack_fut.set_result(None)
proc._shutting_down_fut.set_result(None) # resolves immediately

supervise_done: asyncio.Future[None] = asyncio.Future()
supervise_done.set_result(None)

async def fake_supervise() -> None:
await supervise_done

proc._supervise_atask = asyncio.ensure_future(fake_supervise())

kill_called = False

async def mock_kill() -> None:
nonlocal kill_called
kill_called = True

async def run_aclose() -> None:
with (
patch(
"livekit.agents.ipc.supervised_proc.channel.asend_message",
new_callable=AsyncMock,
),
patch.object(proc, "_send_kill_signal", side_effect=mock_kill),
patch.object(proc, "_send_dump_signal", new_callable=AsyncMock),
):
await proc.aclose()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=5.0))

assert not kill_called, "Kill should NOT be called when shutting_down resolves in time"


class TestThreadJobExecutorAcloseDeadlock:
"""Verify that ThreadJobExecutor.aclose() does not deadlock on _shutting_down_fut.

ThreadJobExecutor has the same unbounded await pattern as SupervisedProc.
Additionally, its _monitor_task did not resolve _shutting_down_fut on channel
close — making the deadlock even more likely since there's no fallback.
"""

def _make_thread_executor(self, close_timeout: float = 1.0) -> ThreadJobExecutor:
loop = asyncio.get_event_loop()
executor = ThreadJobExecutor(
initialize_process_fnc=lambda _: None,
job_entrypoint_fnc=AsyncMock(),
session_end_fnc=None,
inference_executor=None,
initialize_timeout=10.0,
close_timeout=close_timeout,
session_end_timeout=10.0,
ping_interval=5.0,
high_ping_threshold=2.0,
http_proxy=None,
loop=loop,
)
return executor

def test_aclose_does_not_deadlock_on_shutting_down_fut(self) -> None:
"""aclose() must complete (not hang) when _shutting_down_fut never resolves."""
executor = self._make_thread_executor()
executor._closing = True
executor._pch = AsyncMock() # type: ignore[assignment]
executor._shutdown_ack_fut.set_result(None)
# _shutting_down_fut intentionally left unresolved

main_done: asyncio.Future[None] = asyncio.Future()

async def fake_main() -> None:
await main_done

executor._main_atask = asyncio.ensure_future(fake_main())

async def run_aclose() -> None:
with patch(
"livekit.agents.ipc.job_thread_executor.channel.asend_message",
new_callable=AsyncMock,
):
# _shutting_down_fut will timeout, then main_atask wait will timeout
# Both should be bounded, not hang
main_done.set_result(None)
await executor.aclose()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=10.0))

def test_aclose_logs_error_when_shutting_down_times_out(self) -> None:
"""When _shutting_down_fut times out, aclose must log an error."""
executor = self._make_thread_executor()
executor._closing = True
executor._pch = AsyncMock() # type: ignore[assignment]
executor._shutdown_ack_fut.set_result(None)
# _shutting_down_fut intentionally left unresolved

main_done: asyncio.Future[None] = asyncio.Future()
main_done.set_result(None)

async def fake_main() -> None:
await main_done

executor._main_atask = asyncio.ensure_future(fake_main())

async def run_aclose() -> None:
with (
patch(
"livekit.agents.ipc.job_thread_executor.channel.asend_message",
new_callable=AsyncMock,
),
patch("livekit.agents.ipc.job_thread_executor.logger.error") as mock_error,
):
await executor.aclose()
mock_error.assert_any_call(
"job did not send ShuttingDown in time", extra=executor.logging_extra()
)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=10.0))

def test_aclose_proceeds_normally_when_shutting_down_resolves(self) -> None:
"""When _shutting_down_fut resolves promptly, no error is logged for it."""
executor = self._make_thread_executor()
executor._closing = True
executor._pch = AsyncMock() # type: ignore[assignment]
executor._shutdown_ack_fut.set_result(None)
executor._shutting_down_fut.set_result(None) # resolves immediately

main_done: asyncio.Future[None] = asyncio.Future()
main_done.set_result(None)

async def fake_main() -> None:
await main_done

executor._main_atask = asyncio.ensure_future(fake_main())

async def run_aclose() -> None:
with (
patch(
"livekit.agents.ipc.job_thread_executor.channel.asend_message",
new_callable=AsyncMock,
),
patch("livekit.agents.ipc.job_thread_executor.logger.error") as mock_error,
):
await executor.aclose()
# Should not have logged "ShuttingDown" timeout error
for call in mock_error.call_args_list:
assert "ShuttingDown" not in str(call), (
"Should not log ShuttingDown timeout when it resolves in time"
)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(run_aclose(), timeout=5.0))

def test_monitor_task_resolves_shutting_down_on_channel_close(self) -> None:
"""_monitor_task must resolve _shutting_down_fut when the channel closes."""
from livekit.agents.utils.aio.duplex_unix import DuplexClosed # noqa: PLC0415

executor = self._make_thread_executor()
executor._pch = AsyncMock() # type: ignore[assignment]

# Simulate channel.arecv_message raising DuplexClosed immediately
with patch(
"livekit.agents.ipc.job_thread_executor.channel.arecv_message",
side_effect=DuplexClosed(),
):
loop = asyncio.get_event_loop()
loop.run_until_complete(executor._monitor_task())

assert executor._shutting_down_fut.done(), (
"_shutting_down_fut must be resolved when channel closes"
)
assert executor._shutdown_ack_fut.done(), (
"_shutdown_ack_fut must be resolved when channel closes"
)
Loading