Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

if TYPE_CHECKING:
from collections.abc import Sequence
from multiprocessing.managers import SyncManager

from kubernetes import client
from kubernetes.client import models as k8s
Expand Down Expand Up @@ -101,9 +102,14 @@ def __init__(self, *args, **kwargs):
# Override parallelism with team-aware config value
self.parallelism = self.kube_config.parallelism

self._manager = multiprocessing.Manager()
self.task_queue: Queue[KubernetesJob] = self._manager.JoinableQueue()
self.result_queue: Queue[KubernetesResults] = self._manager.JoinableQueue()
# The multiprocessing.Manager() (and the queues it backs) is only needed once the
# scheduler actually runs the executor, so it is created lazily in start(). Constructing
# the executor without starting it -- as the API server does to call get_task_log() for a
# RUNNING task -- must not spawn a Manager process, otherwise that serve_forever child is
# orphaned and leaks (one per API-server worker).
self._manager: SyncManager | None = None
self.task_queue: Queue[KubernetesJob] | None = None
self.result_queue: Queue[KubernetesResults] | None = None
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
Expand Down Expand Up @@ -183,6 +189,9 @@ def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]:
def start(self) -> None:
"""Start the executor."""
self.log.info("Start Kubernetes executor")
self._manager = multiprocessing.Manager()
self.task_queue = self._manager.JoinableQueue()
self.result_queue = self._manager.JoinableQueue()
self.scheduler_job_id = str(self.job_id)
self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
Expand Down Expand Up @@ -952,10 +961,15 @@ def _flush_result_queue(self) -> None:

def end(self) -> None:
"""Shut down the executor."""
if self._manager is None:
# start() was never called (e.g. the executor was only constructed to read task
# logs), so there is no Manager process or queues to shut down.
return
if TYPE_CHECKING:
assert self.task_queue
assert self.result_queue
assert self.kube_scheduler
assert self._manager

self.log.info("Shutting down Kubernetes executor")
try:
Expand All @@ -976,6 +990,11 @@ def end(self) -> None:
except Exception:
self.log.exception("Unknown error while flushing task queue and result queue.")
self._manager.shutdown()
# Return to the unstarted state so a second end() is a no-op (the guard above) and the
# Manager/queues are recreated cleanly if start() is ever called again.
self._manager = None
self.task_queue = None
self.result_queue = None

def terminate(self):
"""Terminate the executor is not doing anything."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ def test_run_next_exception_requeue(
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.task_publish_max_retries = task_publish_max_retries
kubernetes_executor.start()
task_queue = kubernetes_executor.task_queue
assert task_queue is not None
try:
# Execute a task while the Api Throws errors
try_number = 1
Expand All @@ -601,7 +603,7 @@ def test_run_next_exception_requeue(
assert mock_kube_client.create_namespaced_pod.call_count == 1 # type: ignore[attr-defined]

if should_requeue:
assert not kubernetes_executor.task_queue.empty()
assert not task_queue.empty()

# Disable the ApiException
if task_expected_state == State.SUCCESS or task_expected_state == State.QUEUED:
Expand All @@ -615,11 +617,11 @@ def test_run_next_exception_requeue(

kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called # type: ignore[attr-defined]
assert kubernetes_executor.task_queue.empty()
assert task_queue.empty()
if task_expected_state != State.SUCCESS:
assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
else:
assert kubernetes_executor.task_queue.empty()
assert task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
finally:
kubernetes_executor.end()
Expand Down Expand Up @@ -1998,6 +2000,72 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat
"Reading from k8s pod logs failed: error_fetching_pod_log",
]

def test_init_does_not_create_manager_process(self):
"""
Constructing the executor must not spawn a ``multiprocessing.Manager``.

The API server builds a ``KubernetesExecutor`` purely to call ``get_task_log()`` for
RUNNING tasks and never starts it. Eagerly creating the Manager in ``__init__`` leaked an
orphaned ``serve_forever`` process per API-server worker.
"""
Comment thread
shahar1 marked this conversation as resolved.
executor = KubernetesExecutor()

assert executor._manager is None
assert executor.task_queue is None
assert executor.result_queue is None

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_get_task_log_does_not_create_manager_process(
self, mock_get_kube_client, create_task_instance_of_operator
):
"""Reading a running task's logs must not spawn a ``multiprocessing.Manager``."""
mock_kube_client = mock_get_kube_client.return_value
mock_kube_client.read_namespaced_pod_log.return_value = [b"a_"]
mock_pod = mock.Mock(spec=k8s.V1Pod)
mock_pod.metadata.name = "x"
mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod]
ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_k8s_log_dag", task_id="test_task")

executor = KubernetesExecutor()
executor.get_task_log(ti=ti, try_number=1)

assert executor._manager is None

def test_end_without_start_is_noop(self):
"""``end()`` on an executor that was never started must not raise."""
executor = KubernetesExecutor()

# Must not raise even though no Manager/queues were ever created.
executor.end()

assert executor._manager is None

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
def test_start_creates_manager_and_queues(self, mock_watcher, mock_client, mock_kube_client):
"""``start()`` creates the Manager and queues; ``end()`` tears them down idempotently."""
executor = KubernetesExecutor()
executor.job_id = 1
try:
executor.start()

assert executor._manager is not None
assert executor.task_queue is not None
assert executor.result_queue is not None
finally:
executor.end()

# end() returns the executor to the unstarted state, so a second end() is a safe no-op.
assert executor._manager is None
assert executor.task_queue is None
assert executor.result_queue is None
executor.end()

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration")
def test_sentry_integration(self):
assert not KubernetesExecutor.sentry_integration
Expand Down Expand Up @@ -2511,16 +2579,30 @@ def test_executor_with_team_name(self):
assert executor.team_name == "ml_team"
assert executor.kube_config is not None

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires Airflow 3.2+")
def test_multiple_team_executors_isolation(self, monkeypatch):
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
def test_multiple_team_executors_isolation(
self, mock_watcher, mock_client, mock_kube_client, monkeypatch
):
"""Test that multiple team executors can coexist with isolated resources."""
monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE", "4")
monkeypatch.setenv("AIRFLOW__TEAM_B___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE", "8")

team_a_executor = KubernetesExecutor(team_name="team_a")
team_b_executor = KubernetesExecutor(team_name="team_b")
team_a_executor.job_id = 1
team_b_executor.job_id = 2

try:
# Queues are created lazily in start(), so each team executor gets its own.
Comment thread
shahar1 marked this conversation as resolved.
team_a_executor.start()
team_b_executor.start()

assert team_a_executor.task_queue is not team_b_executor.task_queue
assert team_a_executor.result_queue is not team_b_executor.result_queue
assert team_a_executor.running is not team_b_executor.running
Expand Down
Loading