Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,35 @@ config:
type: string
example: ~
default: "16"
mp_start_method:
description: |
The ``multiprocessing`` start method the ``airflow celery worker`` process uses for the
standard-library ``multiprocessing`` helpers it starts: the log server (``serve_logs``),
the stale-bundle-cleanup process, and the optional ``[secrets] use_cache`` manager. Must
be one of the values returned by ``multiprocessing.get_all_start_methods()`` on your
platform (typically ``fork``, ``forkserver`` or ``spawn``). When unset (the default) it
falls back to ``[core] mp_start_method`` and then to the platform default.

Python 3.14 changed the Unix default from ``fork`` to ``forkserver``. ``forkserver`` and
``spawn`` re-import Airflow in each helper and start extra forkserver/resource-tracker
processes, which increases the worker's resident memory; set this to ``fork`` to restore
the pre-3.14 behaviour. This setting governs the standard-library ``multiprocessing``
helpers only: Celery's ``prefork`` pool is driven by ``billiard`` (a separate fork of
``multiprocessing``) and always uses ``fork``, so it is unaffected either way.
version_added: ~
type: string
example: "fork"
default: ~
mp_forkserver_preload:
description: |
Comma-separated list of modules the ``forkserver`` process should import up front, so the
worker's ``multiprocessing`` helpers inherit them copy-on-write instead of re-importing
them. Only used when the effective ``mp_start_method`` is ``forkserver``. Falls back to
``[core] mp_forkserver_preload`` when unset.
version_added: ~
type: string
example: "airflow"
default: ~
worker_autoscale:
description: |
The maximum and minimum number of pool processes that will be used to dynamically resize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
from airflow import settings
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowConfigException
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
from airflow.providers.celery.version_compat import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS,
AIRFLOW_V_3_3_PLUS,
)
from airflow.providers.common.compat.sdk import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
Expand Down Expand Up @@ -193,6 +197,20 @@ def filter(self, record):
@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
# Apply the configured multiprocessing start method before the worker creates any stdlib
# multiprocessing objects -- the serve_logs and stale-bundle-cleanup helper Processes started
# below, and the optional SecretCache Manager. CPython 3.14 switched the Unix default from fork
# to forkserver (gh-84559); under forkserver those helpers re-import Airflow and spin up extra
# forkserver/resource_tracker processes, inflating the worker's resident memory. Setting
# [celery] mp_start_method = fork (or [core] mp_start_method) restores the pre-3.14 behaviour.
# This governs stdlib multiprocessing only; Celery's prefork pool is driven by billiard, which
# keeps its own fork default and is unaffected. Guarded because set_component_mp_start_method
# only exists on Airflow 3.3+.
if AIRFLOW_V_3_3_PLUS:
from airflow.utils.process_utils import set_component_mp_start_method

Comment thread
jason810496 marked this conversation as resolved.
set_component_mp_start_method("celery")

team_config = None
if hasattr(args, "team") and args.team:
# Multi-team is enabled, create team-specific Celery app and use team based config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def get_provider_info():
"example": None,
"default": "16",
},
"mp_start_method": {
"description": "The ``multiprocessing`` start method the ``airflow celery worker`` process uses for the\nstandard-library ``multiprocessing`` helpers it starts: the log server (``serve_logs``),\nthe stale-bundle-cleanup process, and the optional ``[secrets] use_cache`` manager. Must\nbe one of the values returned by ``multiprocessing.get_all_start_methods()`` on your\nplatform (typically ``fork``, ``forkserver`` or ``spawn``). When unset (the default) it\nfalls back to ``[core] mp_start_method`` and then to the platform default.\n\nPython 3.14 changed the Unix default from ``fork`` to ``forkserver``. ``forkserver`` and\n``spawn`` re-import Airflow in each helper and start extra forkserver/resource-tracker\nprocesses, which increases the worker's resident memory; set this to ``fork`` to restore\nthe pre-3.14 behaviour. This setting governs the standard-library ``multiprocessing``\nhelpers only: Celery's ``prefork`` pool is driven by ``billiard`` (a separate fork of\n``multiprocessing``) and always uses ``fork``, so it is unaffected either way.\n",
"version_added": None,
"type": "string",
"example": "fork",
"default": None,
},
"mp_forkserver_preload": {
"description": "Comma-separated list of modules the ``forkserver`` process should import up front, so the\nworker's ``multiprocessing`` helpers inherit them copy-on-write instead of re-importing\nthem. Only used when the effective ``mp_start_method`` is ``forkserver``. Falls back to\n``[core] mp_forkserver_preload`` when unset.\n",
"version_added": None,
"type": "string",
"example": "airflow",
"default": None,
},
"worker_autoscale": {
"description": "The maximum and minimum number of pool processes that will be used to dynamically resize\nthe pool based on load.Enable autoscaling by providing max_concurrency,min_concurrency\nwith the ``airflow celery worker`` command (always keep minimum processes,\nbut grow to maximum if necessary).\nPick these numbers based on resources on worker box and the nature of the task.\nIf autoscale option is available, worker_concurrency will be ignored.\nhttps://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale\n",
"version_added": None,
Expand Down
27 changes: 26 additions & 1 deletion providers/celery/tests/unit/celery/cli/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
from airflow.providers.common.compat.sdk import conf

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
from tests_common.test_utils.version_compat import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS,
AIRFLOW_V_3_3_PLUS,
)

PY313 = sys.version_info >= (3, 13)

Expand Down Expand Up @@ -195,6 +199,27 @@ def test_worker_started_with_required_arguments(
]
)

@pytest.mark.skipif(
not AIRFLOW_V_3_3_PLUS, reason="set_component_mp_start_method only exists on Airflow 3.3+"
)
@mock.patch("airflow.utils.process_utils.set_component_mp_start_method")
@mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset")
@mock.patch("airflow.providers.celery.cli.celery_command.Celery")
@mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
def test_worker_applies_celery_mp_start_method(
self, mock_celery_app, mock_popen, mock_locations, mock_celery_cls, mock_pools_reset, mock_set_mp
):
# The worker pins its stdlib multiprocessing start method (serve_logs / bundle-cleanup /
# SecretCache Manager) from [celery] mp_start_method before spawning any helper process.
mock_locations.return_value = ("pid_file", None, None, None)
args = self.parser.parse_args(["celery", "worker", "--concurrency", "1", "--queues", "queue"])

celery_command.worker(args)

mock_set_mp.assert_called_once_with("celery")


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")
Expand Down
Loading