diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index 66e2a0534525f..352a414e0a5ce 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -148,6 +148,35 @@ config: type: string example: ~ default: "16" + mp_start_method: + description: | + The ``multiprocessing`` start method the ``airflow celery worker`` process uses for the + standard-library ``multiprocessing`` helpers it starts: the log server (``serve_logs``), + the stale-bundle-cleanup process, and the optional ``[secrets] use_cache`` manager. Must + be one of the values returned by ``multiprocessing.get_all_start_methods()`` on your + platform (typically ``fork``, ``forkserver`` or ``spawn``). When unset (the default) it + falls back to ``[core] mp_start_method`` and then to the platform default. + + Python 3.14 changed the Unix default from ``fork`` to ``forkserver``. ``forkserver`` and + ``spawn`` re-import Airflow in each helper and start extra forkserver/resource-tracker + processes, which increases the worker's resident memory; set this to ``fork`` to restore + the pre-3.14 behaviour. This setting governs the standard-library ``multiprocessing`` + helpers only: Celery's ``prefork`` pool is driven by ``billiard`` (a separate fork of + ``multiprocessing``) and always uses ``fork``, so it is unaffected either way. + version_added: ~ + type: string + example: "fork" + default: ~ + mp_forkserver_preload: + description: | + Comma-separated list of modules the ``forkserver`` process should import up front, so the + worker's ``multiprocessing`` helpers inherit them copy-on-write instead of re-importing + them. Only used when the effective ``mp_start_method`` is ``forkserver``. Falls back to + ``[core] mp_forkserver_preload`` when unset. + version_added: ~ + type: string + example: "airflow" + default: ~ worker_autoscale: description: | The maximum and minimum number of pool processes that will be used to dynamically resize diff --git a/providers/celery/src/airflow/providers/celery/cli/celery_command.py b/providers/celery/src/airflow/providers/celery/cli/celery_command.py index 0e2b66aabf10c..3aebe0c20880f 100644 --- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py +++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py @@ -37,7 +37,11 @@ from airflow import settings from airflow.cli.simple_table import AirflowConsole from airflow.exceptions import AirflowConfigException -from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS +from airflow.providers.celery.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, +) from airflow.providers.common.compat.sdk import conf from airflow.utils import cli as cli_utils from airflow.utils.cli import setup_locations @@ -193,6 +197,20 @@ def filter(self, record): @_providers_configuration_loaded def worker(args): """Start Airflow Celery worker.""" + # Apply the configured multiprocessing start method before the worker creates any stdlib + # multiprocessing objects -- the serve_logs and stale-bundle-cleanup helper Processes started + # below, and the optional SecretCache Manager. CPython 3.14 switched the Unix default from fork + # to forkserver (gh-84559); under forkserver those helpers re-import Airflow and spin up extra + # forkserver/resource_tracker processes, inflating the worker's resident memory. Setting + # [celery] mp_start_method = fork (or [core] mp_start_method) restores the pre-3.14 behaviour. + # This governs stdlib multiprocessing only; Celery's prefork pool is driven by billiard, which + # keeps its own fork default and is unaffected. Guarded because set_component_mp_start_method + # only exists on Airflow 3.3+. + if AIRFLOW_V_3_3_PLUS: + from airflow.utils.process_utils import set_component_mp_start_method + + set_component_mp_start_method("celery") + team_config = None if hasattr(args, "team") and args.team: # Multi-team is enabled, create team-specific Celery app and use team based config diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index ce59a55918f11..b1a663e3495cb 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -75,6 +75,20 @@ def get_provider_info(): "example": None, "default": "16", }, + "mp_start_method": { + "description": "The ``multiprocessing`` start method the ``airflow celery worker`` process uses for the\nstandard-library ``multiprocessing`` helpers it starts: the log server (``serve_logs``),\nthe stale-bundle-cleanup process, and the optional ``[secrets] use_cache`` manager. Must\nbe one of the values returned by ``multiprocessing.get_all_start_methods()`` on your\nplatform (typically ``fork``, ``forkserver`` or ``spawn``). When unset (the default) it\nfalls back to ``[core] mp_start_method`` and then to the platform default.\n\nPython 3.14 changed the Unix default from ``fork`` to ``forkserver``. ``forkserver`` and\n``spawn`` re-import Airflow in each helper and start extra forkserver/resource-tracker\nprocesses, which increases the worker's resident memory; set this to ``fork`` to restore\nthe pre-3.14 behaviour. This setting governs the standard-library ``multiprocessing``\nhelpers only: Celery's ``prefork`` pool is driven by ``billiard`` (a separate fork of\n``multiprocessing``) and always uses ``fork``, so it is unaffected either way.\n", + "version_added": None, + "type": "string", + "example": "fork", + "default": None, + }, + "mp_forkserver_preload": { + "description": "Comma-separated list of modules the ``forkserver`` process should import up front, so the\nworker's ``multiprocessing`` helpers inherit them copy-on-write instead of re-importing\nthem. Only used when the effective ``mp_start_method`` is ``forkserver``. Falls back to\n``[core] mp_forkserver_preload`` when unset.\n", + "version_added": None, + "type": "string", + "example": "airflow", + "default": None, + }, "worker_autoscale": { "description": "The maximum and minimum number of pool processes that will be used to dynamically resize\nthe pool based on load.Enable autoscaling by providing max_concurrency,min_concurrency\nwith the ``airflow celery worker`` command (always keep minimum processes,\nbut grow to maximum if necessary).\nPick these numbers based on resources on worker box and the nature of the task.\nIf autoscale option is available, worker_concurrency will be ignored.\nhttps://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale\n", "version_added": None, diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py b/providers/celery/tests/unit/celery/cli/test_celery_command.py index 99a9506f3d625..49495e818a5e1 100644 --- a/providers/celery/tests/unit/celery/cli/test_celery_command.py +++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py @@ -36,7 +36,11 @@ from airflow.providers.common.compat.sdk import conf from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS +from tests_common.test_utils.version_compat import ( + AIRFLOW_V_3_0_PLUS, + AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, +) PY313 = sys.version_info >= (3, 13) @@ -195,6 +199,27 @@ def test_worker_started_with_required_arguments( ] ) + @pytest.mark.skipif( + not AIRFLOW_V_3_3_PLUS, reason="set_component_mp_start_method only exists on Airflow 3.3+" + ) + @mock.patch("airflow.utils.process_utils.set_component_mp_start_method") + @mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset") + @mock.patch("airflow.providers.celery.cli.celery_command.Celery") + @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations") + @mock.patch("airflow.providers.celery.cli.celery_command.Process") + @mock.patch("airflow.providers.celery.executors.celery_executor.app") + def test_worker_applies_celery_mp_start_method( + self, mock_celery_app, mock_popen, mock_locations, mock_celery_cls, mock_pools_reset, mock_set_mp + ): + # The worker pins its stdlib multiprocessing start method (serve_logs / bundle-cleanup / + # SecretCache Manager) from [celery] mp_start_method before spawning any helper process. + mock_locations.return_value = ("pid_file", None, None, None) + args = self.parser.parse_args(["celery", "worker", "--concurrency", "1", "--queues", "queue"]) + + celery_command.worker(args) + + mock_set_mp.assert_called_once_with("celery") + @pytest.mark.backend("mysql", "postgres") @pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")