Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions chart/files/pod-template-file.kubernetes-helm-yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ spec:
env:
- name: AIRFLOW__CORE__EXECUTOR
value: {{ .Values.executor | quote }}
# Deliver pod-termination signals only to the task supervisor (dumb-init's
# direct child) instead of broadcasting them to the whole process group.
# On graceful pod shutdown the supervisor's warm-shutdown handler then lets
# the running task finish -- the same mechanism Celery workers use -- rather
# than the task subprocess being killed directly by dumb-init's group-wide
# SIGTERM (which could also reach the subprocess before it installs its own
# signal handler). Hard kills (heartbeat loss / overtime / the post-grace
# SIGKILL) are unaffected.
- name: DUMB_INIT_SETSID
value: "0"
Comment thread
Miretpl marked this conversation as resolved.
{{- if or .Values.workers.kubernetes.kerberosSidecar.enabled .Values.workers.kubernetes.kerberosInitContainer.enabled }}
- name: KRB5_CONFIG
value: {{ .Values.kerberos.configPath | quote }}
Expand Down
7 changes: 7 additions & 0 deletions chart/newsfragments/69034.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Default ``DUMB_INIT_SETSID`` changed to ``"0"`` for KubernetesExecutor task pods.

Pod-termination signals (e.g. SIGTERM on graceful shutdown) are now delivered only to the
task supervisor (``dumb-init``'s direct child) instead of being broadcast to the whole
process group. This lets a running task finish via the supervisor's warm-shutdown handler --
the same behaviour Celery worker pods already had -- rather than the task subprocess being
killed directly. Hard kills (heartbeat loss / overtime / the post-grace SIGKILL) are unaffected.
11 changes: 11 additions & 0 deletions chart/tests/helm_tests/airflow_aux/test_pod_template_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,17 @@ def test_should_add_extraEnvs(self):
"valueFrom": {"configMapKeyRef": {"name": "my-config-map", "key": "my-key"}},
} in jmespath.search("spec.containers[0].env", docs[0])

def test_should_set_dumb_init_setsid_for_warm_shutdown(self):
"""Pod-termination signals must reach only the supervisor so a running task can warm-shut-down."""
docs = render_chart(
show_only=["templates/pod-template-file.yaml"],
chart_dir=self.temp_chart_dir,
)

assert {"name": "DUMB_INIT_SETSID", "value": "0"} in jmespath.search(
"spec.containers[0].env", docs[0]
)

def test_should_add_component_specific_labels(self):
docs = render_chart(
values={
Expand Down
9 changes: 9 additions & 0 deletions docker-stack-docs/entrypoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ The table below summarizes ``DUMB_INIT_SETSID`` possible values and their use ca
| | If you are running it through ``["bash", "-c"]`` command, |
| | you need to start the worker via ``exec airflow celery worker`` |
| | as the last command executed. |
| | |
| | The same applies to KubernetesExecutor task pods. Here ``dumb-init`` |
| | runs as the init process and its direct child is the task |
| | *supervisor*, which supervises a single task subprocess. Setting the |
| | variable to 0 propagates a graceful SIGTERM only to the supervisor, |
| | which then performs a warm shutdown and waits for the running task |
| | to finish, instead of the signal being broadcast to the whole |
| | process group and killing the task subprocess directly. The Airflow |
| | Helm chart sets this on the KubernetesExecutor pod template for you. |
+----------------+----------------------------------------------------------------------+

Additional quick test options
Expand Down
68 changes: 56 additions & 12 deletions task-sdk/src/airflow/sdk/execution_time/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import contextlib
import functools
import os
import signal
from typing import TYPE_CHECKING, Any

import attrs
Expand All @@ -50,7 +52,7 @@
from airflow.sdk.configuration import conf

if TYPE_CHECKING:
from collections.abc import Mapping
from collections.abc import Generator, Mapping
from os import PathLike

from structlog.typing import FilteringBoundLogger
Expand Down Expand Up @@ -114,6 +116,43 @@ class _CoordinatorSpec(pydantic.BaseModel):
extra: dict[str, Any] | None = None


@contextlib.contextmanager
def _warm_shutdown_signals() -> Generator[None, None, None]:
"""
Install SIGTERM/SIGINT warm-shutdown handlers for the duration of task supervision.

While supervising a task the supervisor must not be torn down by a
termination signal; instead it keeps running so the task can finish (or be
shut down gracefully) and its terminal state and logs are reported. The
handlers are installed around BOTH ``start()`` (which transitions the TI to
RUNNING) and ``wait()`` (which runs the task and then reports the terminal
state / uploads logs), so there is no window where Python's default SIGTERM
disposition could kill the supervisor and tear the just-started task down
with it.

The previous dispositions are restored on exit so a long-lived supervisor
process (e.g. a reused Celery prefork worker) does not leak the handler into
later tasks or clobber the worker's own signal handling.
"""

def _warm_shutdown(signum, frame):
log.info(
"Received signal; warm shutdown in progress, waiting for the running task to complete.",
signal=signal.Signals(signum).name,
pid=os.getpid(),
)

prev_sigterm = signal.getsignal(signal.SIGTERM)
prev_sigint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGTERM, _warm_shutdown)
signal.signal(signal.SIGINT, _warm_shutdown)
try:
yield
finally:
signal.signal(signal.SIGTERM, prev_sigterm)
signal.signal(signal.SIGINT, prev_sigint)


class _PythonCoordinator(BaseCoordinator):
"""
Coordinator implementation to execute Python tasks.
Expand All @@ -140,17 +179,22 @@ def execute_task(
# process handling.
from airflow.sdk.execution_time.supervisor import ActivitySubprocess

process = ActivitySubprocess.start(
dag_rel_path=dag_rel_path,
what=what,
client=client,
logger=logger,
bundle_info=bundle_info,
subprocess_logs_to_stdout=subprocess_logs_to_stdout,
sentry_integration=sentry_integration,
)
exit_code = process.wait()
return self.ExecutionResult(exit_code, process.final_state)
# Keep the warm-shutdown handlers installed across both start() (which
# transitions the TI to RUNNING) and wait() (which runs the task and
# reports its terminal state / uploads logs) so a SIGTERM at any point
# in this window can't kill the supervisor and tear the task down.
with _warm_shutdown_signals():
process = ActivitySubprocess.start(
dag_rel_path=dag_rel_path,
what=what,
client=client,
logger=logger,
bundle_info=bundle_info,
subprocess_logs_to_stdout=subprocess_logs_to_stdout,
sentry_integration=sentry_integration,
)
exit_code = process.wait()
return self.ExecutionResult(exit_code, process.final_state)


@functools.cache
Expand Down
21 changes: 0 additions & 21 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1392,31 +1392,10 @@ def wait(self) -> int:
if self._exit_code is not None:
return self._exit_code

# Forward termination signals to the task subprocess so the operator's
# on_kill() hook runs on graceful shutdown (e.g. K8s pod SIGTERM).
# Without this the supervisor exits on SIGTERM without notifying the
# child, leaving spawned resources (pods, subprocesses, etc.) running.
prev_sigterm = signal.getsignal(signal.SIGTERM)
prev_sigint = signal.getsignal(signal.SIGINT)

def _forward_signal(signum, frame):
log.info(
"Received signal, forwarding to task subprocess",
signal=signal.Signals(signum).name,
pid=self.pid,
)
with suppress(ProcessLookupError):
os.kill(self.pid, signum)

signal.signal(signal.SIGTERM, _forward_signal)
signal.signal(signal.SIGINT, _forward_signal)

try:
self._monitor_subprocess()
finally:
self.selector.close()
signal.signal(signal.SIGTERM, prev_sigterm)
signal.signal(signal.SIGINT, prev_sigint)

# self._monitor_subprocess() will set the exit code when the process has finished
# If it hasn't, assume it's failed
Expand Down
44 changes: 0 additions & 44 deletions task-sdk/tests/task_sdk/dags/signal_forward_test.py

This file was deleted.

137 changes: 137 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from __future__ import annotations

import json
import os
import signal
from unittest import mock

import pytest

Expand All @@ -28,6 +31,7 @@
BaseCoordinator,
CoordinatorManager,
_PythonCoordinator,
_warm_shutdown_signals,
get_coordinator_manager,
reset_coordinator_manager,
)
Expand Down Expand Up @@ -229,3 +233,136 @@ def test_every_example_coordinator_constructs(self, sdk_config):
for queue, key in queue_to_coordinator.items():
coordinator = manager.for_queue(queue)
assert isinstance(coordinator, import_string(specs[key]["classpath"]))


class TestWarmShutdownSignals:
"""Tests for the warm-shutdown signal handling that wraps task supervision."""

@pytest.fixture(autouse=True)
def _restore_disposition(self):
"""Guarantee SIGTERM/SIGINT dispositions are restored even if a test leaks one."""
original_term = signal.getsignal(signal.SIGTERM)
original_int = signal.getsignal(signal.SIGINT)
yield
signal.signal(signal.SIGTERM, original_term)
signal.signal(signal.SIGINT, original_int)

def test_installs_handlers_inside_context(self):
"""While the context is active a warm-shutdown handler is installed for both signals."""
sentinel_term = signal.getsignal(signal.SIGTERM)
sentinel_int = signal.getsignal(signal.SIGINT)

with _warm_shutdown_signals():
inside_term = signal.getsignal(signal.SIGTERM)
inside_int = signal.getsignal(signal.SIGINT)

assert callable(inside_term)
assert callable(inside_int)
# The installed handler is the warm-shutdown closure, not the previous disposition.
assert inside_term is not sentinel_term
assert inside_int is not sentinel_int
# Both signals share the same warm-shutdown closure.
assert inside_term is inside_int

def test_restores_previous_dispositions_on_exit(self):
"""The exact previous dispositions are restored when the context exits normally."""

def _prev_term(signum, frame): # pragma: no cover - never invoked
pass

def _prev_int(signum, frame): # pragma: no cover - never invoked
pass

signal.signal(signal.SIGTERM, _prev_term)
signal.signal(signal.SIGINT, _prev_int)

with _warm_shutdown_signals():
pass

assert signal.getsignal(signal.SIGTERM) is _prev_term
assert signal.getsignal(signal.SIGINT) is _prev_int

def test_restores_previous_dispositions_on_exception(self):
"""Dispositions are restored even if the wrapped body raises."""

def _prev_term(signum, frame): # pragma: no cover - never invoked
pass

signal.signal(signal.SIGTERM, _prev_term)

with pytest.raises(RuntimeError, match="boom"), _warm_shutdown_signals():
raise RuntimeError("boom")

assert signal.getsignal(signal.SIGTERM) is _prev_term

def test_sigterm_inside_context_does_not_kill(self):
"""
A SIGTERM delivered while supervising must be swallowed, not kill the process.

This is the regression guard: with the default SIGTERM disposition (SIG_DFL)
in place as the *previous* handler, sending SIGTERM to ourselves would
terminate the process. The warm-shutdown handler installed by the context
manager must absorb it so the running task is allowed to finish.
"""
# Make the pre-context disposition the default so a missing warm-shutdown
# handler would actually kill this process (and fail the test by dying).
signal.signal(signal.SIGTERM, signal.SIG_DFL)

reached_after_signal = False
with _warm_shutdown_signals():
os.kill(os.getpid(), signal.SIGTERM)
# If the handler did not absorb the signal, we never get here.
reached_after_signal = True

assert reached_after_signal
# And the default disposition is put back afterwards.
assert signal.getsignal(signal.SIGTERM) is signal.SIG_DFL


class TestPythonCoordinatorWarmShutdown:
"""The Python coordinator must wrap start() and wait() in the warm-shutdown handlers."""

def test_execute_task_wraps_start_and_wait(self, monkeypatch):
"""
Handlers are installed for the whole start()+wait() window and restored after.

Capturing the SIGTERM disposition at the moment ``start()`` and ``wait()``
run proves the handler spans the RUNNING transition (start) and the
terminal-state report / log upload (wait), with no window left uncovered.
"""
original_term = signal.getsignal(signal.SIGTERM)
captured: dict[str, object] = {}

class _FakeProcess:
final_state = "success"

def wait(self_inner):
captured["wait"] = signal.getsignal(signal.SIGTERM)
return 0

def _fake_start(*args, **kwargs):
captured["start"] = signal.getsignal(signal.SIGTERM)
return _FakeProcess()

import airflow.sdk.execution_time.supervisor as supervisor_mod

monkeypatch.setattr(supervisor_mod.ActivitySubprocess, "start", staticmethod(_fake_start))

coordinator = _PythonCoordinator()
result = coordinator.execute_task(
what=mock.MagicMock(),
dag_rel_path="some_dag.py",
bundle_info=mock.MagicMock(),
client=mock.MagicMock(),
subprocess_logs_to_stdout=False,
)

assert result.exit_code == 0
assert result.final_state == "success"
# During both start() and wait() a warm-shutdown handler was installed...
assert callable(captured["start"])
assert callable(captured["wait"])
assert captured["start"] is not original_term
assert captured["start"] is captured["wait"]
# ...and the original disposition is restored once execute_task returns.
assert signal.getsignal(signal.SIGTERM) is original_term
Loading
Loading