Fix orphaned subprocesses and supervisor crash on heartbeat 409#65738
Fix orphaned subprocesses and supervisor crash on heartbeat 409#65738cmettler wants to merge 1 commit into
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
When a running TaskInstance is forcibly transitioned out of `running` (e.g. the scheduler resets a stale heartbeat, or an operator PATCHes the state to `failed`), the task-runner's next heartbeat returns HTTP 409 and the supervisor kills the task. Before this change two things went wrong on Linux: 1. Subprocesses the task-runner had spawned (`@task.virtualenv` / `PythonVirtualenvOperator` children, `DockerOperator` exec, Bash shells) were reparented to PID 1 and kept running as orphans until they finished on their own - wasting CPU, RAM and third-party API quota. 2. About 60s later, `_cleanup_open_sockets()` closed the selector while `_service_subprocess()` was still using it, so the supervisor crashed with `ValueError: I/O operation on closed epoll object` (regression from PR apache#51180). The task-runner is now placed in its own session via `os.setsid()` immediately after fork, so its process group ID equals its PID. The supervisor's `kill()` signals the whole group via `os.killpg(os.getpgid(pid), sig)`, which reaches every subprocess the task-runner spawned. Grandchildren without a SIGTERM handler exit promptly, close their inherited pipes, and the supervisor drains `_open_sockets` normally - so `_cleanup_open_sockets()` is never triggered and the selector is never closed mid-loop. `os.killpg`/`os.getpgid` fall back to `self._process.send_signal(sig)` on `ProcessLookupError` or `PermissionError`, preserving prior behaviour when the group has vanished (e.g. the task was already reaped) or permissions are lacking. closes: apache#65505
|
CI failures were unrelated — caused by azure-storage-blob 12.30.0 breaking the WASB SAS-token tests (issue #68482), fixed upstream in #68490. After rebasing onto current main, that fix is now in our sources and CI should pass. PR is ready for review when you have time. Drafted-by: Claude Code (Opus 4.7); reviewed by @cmettler before posting |
| # have been placed in its own session via os.setsid() at fork | ||
| # time (see start()). See issue #65505. | ||
| try: | ||
| os.killpg(os.getpgid(self._process.pid), sig) |
There was a problem hiding this comment.
os.killpg(os.getpgid(self._process.pid), sig) trusts that the child has already run os.setsid(). Two paths break that assumption and make this signal the supervisor's own process group:
setsid()is wrapped inwith suppress(OSError)instart(), so if it ever fails the child stays in the supervisor's group._on_child_startedcallsself.kill(signal.SIGKILL)on any exception fromtask_instances.start()(line 1385).setsid()runs in the forked child and the parent doesn't synchronize on it, so a synchronous failure there can reachkill()before the child has runsetsid(). (A 409 over the network is fine, since the round-trip gives the child time to run it; a local/synchronous failure isn't.)
In both cases os.getpgid(child) returns the supervisor's PGID and os.killpg(..., SIGKILL) hits the supervisor and every sibling in its group. The except (ProcessLookupError, PermissionError) fallback doesn't catch this because nothing is raised. test_child_is_session_leader asserts this exact invariant ("so os.killpg() does not signal the supervisor itself"), but the production path has no guard.
Either set the group race-free from the parent too (with suppress(OSError): os.setpgid(pid, pid) right after the fork) or guard the kill site:
pgid = os.getpgid(self._process.pid)
if pgid == os.getpgid(0):
self._process.send_signal(sig)
else:
os.killpg(pgid, sig)Separately: this group-signal only runs on the kill() path. The graceful path in wait() (_forward_signal -> os.kill(self.pid, signum)) still signals the task-runner alone, so the orphan leak this PR targets persists on graceful SIGTERM (e.g. K8s pod termination). Now that the child is a session leader, that path could use killpg too.
There was a problem hiding this comment.
Thinking about this more: airflow.utils.process_utils.reap_process_group() already implements this whole teardown, and it has the exact guard that's missing here:
if not IS_WINDOWS and process_group_id == os.getpgid(0):
raise RuntimeError("I refuse to kill myself")It also covers what this loop doesn't: SIGTERM -> wait -> SIGKILL escalation via psutil.wait_procs, EPERM -> sudo -n kill for the run_as_user case, and ESRCH (the "child hasn't changed its group yet" race) by falling back to signalling the PID directly.
It lives in airflow-core, and the supervisor keeps its airflow.* imports lazy for worker isolation, so it's not a drop-in import. But rather than hand-rolling a second, less complete version here, should we port/copy reap_process_group (+ its self-group guard) into task-sdk and use that instead? One tested teardown path beats two that can drift.
| # only hits the task-runner and any Popen children are reparented | ||
| # to PID 1 and leak as orphans. See issue #65505. | ||
| with suppress(OSError): | ||
| os.setsid() |
There was a problem hiding this comment.
This setsid() (and the killpg in kill()) lands in the base WatchedSubprocess, and kill() is defined only here with no subclass override, so it applies to every subprocess type (DagFileProcessorProcess, TriggerRunnerSupervisor, CallbackSubprocess), not just the ActivitySubprocess task-runner this PR describes.
The triggerer's long-lived async subprocess installs its own SIGINT/SIGTERM handlers and expects a graceful shutdown; making it a session leader and group-signalling it changes that. Detaching these children into their own session also means a terminal Ctrl-C (foreground-group SIGINT) no longer reaches them directly. The neighbouring use_exec handles exactly this by being opt-in per subclass (its docstring even notes the DAG processor and triggerer as a follow-up).
Suggest the same here: a new_session: bool = False param on start() set True only in ActivitySubprocess, with the killpg branch in kill() gated on it. That keeps the change scoped to the task-runner, and confines the self-signal risk flagged on the killpg line to the one path you've actually tested.
There was a problem hiding this comment.
While you're here: this setsid() satisfies the # TODO: Make this process a session leader at line 420 (top of _fork_main). That TODO is now stale and can be dropped.
There was a problem hiding this comment.
On the scope point: the existing set_new_process_group() (airflow.utils.process_utils) does this with os.setpgid(0, 0) rather than os.setsid(). That still gives a killable process group (so killpg reaches the whole tree) but without creating a new session / detaching the controlling terminal, so it avoids the Ctrl-C / foreground-group change noted above. Its companion reap_process_group() already has the SIGTERM -> SIGKILL escalation and EPERM/ESRCH handling.
Same question as on the kill() thread: should we port/copy these over into task-sdk and use them here instead of a fresh setsid/killpg path? Reusing the existing setpgid group + self-group guard would address both the over-broad scope and the self-signal risk in one go.
There was a problem hiding this comment.
(just thinking out loud -- we can surely do it in separate PR too but wanted to raise so I don't forget either :) )
|
Following up on @ashb’s internal note about the session-leader TODO: Confirmed this PR does make the task-runner a session leader — That makes the pre-existing Could you drop that TODO comment as part of this change? Thanks! |
When a running TaskInstance is forcibly transitioned out of
running(scheduler reset, REST PATCH, etc.), the next heartbeat from the still-running task-runner returns HTTP 409 and the supervisor kills the task. On Linux this produced two bugs:@task.virtualenv,DockerOperator,BashOperator, Cosmos dbt, etc.) was reparented to PID 1 and kept running until it finished on its own, wasting CPU/RAM/API quota._cleanup_open_sockets()closed the selector while_service_subprocess()was still polling it, raisingValueError: I/O operation on closed epoll object(regression from Fix lingering task supervisors whenEOFis missed #51180).Fix
Place the task-runner in its own session via
os.setsid()immediately after fork, then havekill()signal the whole process group viaos.killpg(os.getpgid(pid), sig). This reaches every subprocess the task-runner spawned. Grandchildren without a SIGTERM handler exit promptly and close their inherited pipes, so the supervisor drains_open_socketsnormally and never enters the cleanup-the-selector-mid-loop path.killpg/getpgidfall back toself._process.send_signal(sig)onProcessLookupErrororPermissionError, preserving behaviour when the group has vanished or permissions are lacking.Tests
test_kill_signals_process_group— primary path useskillpg.test_kill_falls_back_to_send_signal_when_group_signal_fails(4 params:{ProcessLookupError, PermissionError} × {getpgid, killpg}).test_child_is_session_leader— real-fork regression: asserts child's PGID == child's PID afterActivitySubprocess.start().os.getpgid/os.killpgexplicitly.uv run --project task-sdk pytest task-sdk/tests/task_sdk/execution_time/test_supervisor.py).closes: #65505
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.7 (1M context) following the guidelines
Important
🛠️ Maintainer triage note for @cmettler · by
@potiuk· 2026-06-22 06:31 UTCHelpful heads-up from the maintainers — please address before this PR can be reviewed (see the Pull Request quality criteria):
The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.
Automated triage — may be imperfect; a maintainer takes the next look.