Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,16 @@ def start(

pid = os.fork()
if pid == 0:
# Put the task-runner into its own session so its PGID == its own
# PID. The supervisor can then deliver signals to the whole tree
# via os.killpg() in kill(), reaching every subprocess the
# task-runner spawned (e.g. venv children from
# PythonVirtualenvOperator). Without this, a SIGTERM from kill()
# only hits the task-runner and any Popen children are reparented
# to PID 1 and leak as orphans. See issue #65505.
with suppress(OSError):
os.setsid()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setsid() (and the killpg in kill()) lands in the base WatchedSubprocess, and kill() is defined only here with no subclass override, so it applies to every subprocess type (DagFileProcessorProcess, TriggerRunnerSupervisor, CallbackSubprocess), not just the ActivitySubprocess task-runner this PR describes.

The triggerer's long-lived async subprocess installs its own SIGINT/SIGTERM handlers and expects a graceful shutdown; making it a session leader and group-signalling it changes that. Detaching these children into their own session also means a terminal Ctrl-C (foreground-group SIGINT) no longer reaches them directly. The neighbouring use_exec handles exactly this by being opt-in per subclass (its docstring even notes the DAG processor and triggerer as a follow-up).

Suggest the same here: a new_session: bool = False param on start() set True only in ActivitySubprocess, with the killpg branch in kill() gated on it. That keeps the change scoped to the task-runner, and confines the self-signal risk flagged on the killpg line to the one path you've actually tested.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're here: this setsid() satisfies the # TODO: Make this process a session leader at line 420 (top of _fork_main). That TODO is now stale and can be dropped.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the scope point: the existing set_new_process_group() (airflow.utils.process_utils) does this with os.setpgid(0, 0) rather than os.setsid(). That still gives a killable process group (so killpg reaches the whole tree) but without creating a new session / detaching the controlling terminal, so it avoids the Ctrl-C / foreground-group change noted above. Its companion reap_process_group() already has the SIGTERM -> SIGKILL escalation and EPERM/ESRCH handling.

Same question as on the kill() thread: should we port/copy these over into task-sdk and use them here instead of a fresh setsid/killpg path? Reusing the existing setpgid group + self-group guard would address both the over-broad scope and the self-signal risk in one go.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just thinking out loud -- we can surely do it in separate PR too but wanted to raise so I don't forget either :) )


# Close and delete of the parent end of the sockets.
cls._close_unused_sockets(read_requests, read_stdout, read_stderr, read_logs)

Expand Down Expand Up @@ -1007,7 +1017,18 @@ def kill(

for sig in escalation_path:
try:
self._process.send_signal(sig)
# Signal the whole process group so subprocesses the
# task-runner spawned (venv children, Docker exec, bash
# shells, etc.) are also reached. Requires the task-runner to
# have been placed in its own session via os.setsid() at fork
# time (see start()). See issue #65505.
try:
os.killpg(os.getpgid(self._process.pid), sig)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.killpg(os.getpgid(self._process.pid), sig) trusts that the child has already run os.setsid(). Two paths break that assumption and make this signal the supervisor's own process group:

  1. setsid() is wrapped in with suppress(OSError) in start(), so if it ever fails the child stays in the supervisor's group.
  2. _on_child_started calls self.kill(signal.SIGKILL) on any exception from task_instances.start() (line 1385). setsid() runs in the forked child and the parent doesn't synchronize on it, so a synchronous failure there can reach kill() before the child has run setsid(). (A 409 over the network is fine, since the round-trip gives the child time to run it; a local/synchronous failure isn't.)

In both cases os.getpgid(child) returns the supervisor's PGID and os.killpg(..., SIGKILL) hits the supervisor and every sibling in its group. The except (ProcessLookupError, PermissionError) fallback doesn't catch this because nothing is raised. test_child_is_session_leader asserts this exact invariant ("so os.killpg() does not signal the supervisor itself"), but the production path has no guard.

Either set the group race-free from the parent too (with suppress(OSError): os.setpgid(pid, pid) right after the fork) or guard the kill site:

pgid = os.getpgid(self._process.pid)
if pgid == os.getpgid(0):
    self._process.send_signal(sig)
else:
    os.killpg(pgid, sig)

Separately: this group-signal only runs on the kill() path. The graceful path in wait() (_forward_signal -> os.kill(self.pid, signum)) still signals the task-runner alone, so the orphan leak this PR targets persists on graceful SIGTERM (e.g. K8s pod termination). Now that the child is a session leader, that path could use killpg too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more: airflow.utils.process_utils.reap_process_group() already implements this whole teardown, and it has the exact guard that's missing here:

if not IS_WINDOWS and process_group_id == os.getpgid(0):
    raise RuntimeError("I refuse to kill myself")

It also covers what this loop doesn't: SIGTERM -> wait -> SIGKILL escalation via psutil.wait_procs, EPERM -> sudo -n kill for the run_as_user case, and ESRCH (the "child hasn't changed its group yet" race) by falling back to signalling the PID directly.

It lives in airflow-core, and the supervisor keeps its airflow.* imports lazy for worker isolation, so it's not a drop-in import. But rather than hand-rolling a second, less complete version here, should we port/copy reap_process_group (+ its self-group guard) into task-sdk and use that instead? One tested teardown path beats two that can drift.

except (ProcessLookupError, PermissionError):
# Group vanished or we lack permission (e.g. task already
# reaped, or the child never reached setsid). Fall back
# to signalling the task-runner alone.
self._process.send_signal(sig)

start = time.monotonic()
end = start + escalation_delay
Expand Down
105 changes: 101 additions & 4 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,60 @@ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker):
proc.selector.close.assert_called_once()
proc.stdin.close.assert_called_once()

def test_child_is_session_leader(self, client_with_ti_start):
"""Regression test for #65505: after fork, the task-runner child must
call os.setsid() so its PGID equals its own PID. This allows kill()
to reach subprocesses the task-runner spawns via os.killpg(); without
setsid, a venv/Popen child of the task-runner inherits the
supervisor's process group and killpg would signal the supervisor too
(or miss the grandchild entirely).
"""

def subprocess_main():
CommsDecoder()._get_response()
sleep(10)

proc = ActivitySubprocess.start(
dag_rel_path=os.devnull,
bundle_info=FAKE_BUNDLE,
what=TaskInstance(
id=uuid7(),
task_id="b",
dag_id="c",
run_id="d",
try_number=1,
dag_version_id=uuid7(),
queue="default",
),
client=client_with_ti_start,
target=subprocess_main,
)
try:
# Give the child a moment to run setsid() after fork.
deadline = time.monotonic() + 2.0
child_pgid = None
while time.monotonic() < deadline:
try:
child_pgid = os.getpgid(proc.pid)
except ProcessLookupError:
sleep(0.05)
continue
if child_pgid == proc.pid:
break
sleep(0.05)

assert child_pgid == proc.pid, (
"Task-runner child must be its own session/process-group leader "
f"(os.setsid called after fork). Got pgid={child_pgid}, pid={proc.pid}."
)
assert child_pgid != os.getpgid(os.getpid()), (
"Child's process group must differ from the supervisor's so "
"os.killpg() from kill() does not signal the supervisor itself."
)
finally:
proc.kill(signal.SIGKILL, force=True)
proc.wait()


class TestWatchedSubprocessKill:
@pytest.fixture
Expand Down Expand Up @@ -1317,25 +1371,68 @@ def watched_subprocess(self, mocker, mock_process):
proc.selector = mock_selector
return proc

def test_kill_process_already_exited(self, watched_subprocess, mock_process):
def test_kill_process_already_exited(self, watched_subprocess, mock_process, mocker):
"""Test behavior when the process has already exited."""
# When the process is gone, getpgid raises ProcessLookupError and the
# kill() path falls back to send_signal on the dead psutil.Process.
mocker.patch("os.getpgid", side_effect=ProcessLookupError)
mock_process.wait.side_effect = psutil.NoSuchProcess(pid=1234)
watched_subprocess.kill(signal.SIGINT, force=True)

mock_process.send_signal.assert_called_once_with(signal.SIGINT)
mock_process.wait.assert_called_once()
assert watched_subprocess._exit_code == -1

def test_kill_process_custom_signal(self, watched_subprocess, mock_process):
"""Test that the process is killed with the correct signal."""
def test_kill_process_custom_signal(self, watched_subprocess, mock_process, mocker):
"""Test that the process is killed with the correct signal via killpg."""
mock_getpgid = mocker.patch("os.getpgid", return_value=12345)
mock_killpg = mocker.patch("os.killpg")
mock_process.wait.return_value = 0

signal_to_send = signal.SIGUSR1
watched_subprocess.kill(signal_to_send, force=False)

mock_process.send_signal.assert_called_once_with(signal_to_send)
mock_getpgid.assert_called_once_with(12345)
mock_killpg.assert_called_once_with(12345, signal_to_send)
mock_process.send_signal.assert_not_called()
mock_process.wait.assert_called_once_with(timeout=0)

def test_kill_signals_process_group(self, watched_subprocess, mock_process, mocker):
"""Regression test for #65505: kill() must signal the whole process
group so subprocesses spawned by the task-runner (venv children,
Docker exec, bash shells) are also reached.
"""
mock_getpgid = mocker.patch("os.getpgid", return_value=12345)
mock_killpg = mocker.patch("os.killpg")
mock_process.wait.return_value = 0

watched_subprocess.kill(signal.SIGTERM, force=False)

mock_getpgid.assert_called_once_with(12345)
mock_killpg.assert_called_once_with(12345, signal.SIGTERM)
mock_process.send_signal.assert_not_called()

@pytest.mark.parametrize("failing_call", ["getpgid", "killpg"])
@pytest.mark.parametrize("exc", [ProcessLookupError, PermissionError])
def test_kill_falls_back_to_send_signal_when_group_signal_fails(
self, watched_subprocess, mock_process, mocker, failing_call, exc
):
"""If os.killpg or os.getpgid raises ProcessLookupError (group vanished
or child never reached setsid) or PermissionError, fall back to
signalling the task-runner PID directly via send_signal.
"""
if failing_call == "getpgid":
mocker.patch("os.getpgid", side_effect=exc)
mocker.patch("os.killpg")
else:
mocker.patch("os.getpgid", return_value=12345)
mocker.patch("os.killpg", side_effect=exc)
mock_process.wait.return_value = 0

watched_subprocess.kill(signal.SIGTERM, force=False)

mock_process.send_signal.assert_called_once_with(signal.SIGTERM)

@pytest.mark.parametrize(
("signal_to_send", "exit_after"),
[
Expand Down
Loading