From 7d5ff9a7e427277a4e790f4ee2b0d33980b7585d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 26 Jun 2026 16:25:18 +0900 Subject: [PATCH] Defer Cadwyn import to keep FastAPI off the Task SDK worker path The Task SDK supervisor imports the execution-time schema package on every worker, and that package imported cadwyn at module load. cadwyn pulls in FastAPI, Starlette and Jinja2, which are only needed on the foreign-language-SDK schema-migration path and never for a pure-Python worker. Because the supervisor is imported lazily after the Celery prefork, each pool worker paid this import in private (non copy-on-write) pages, raising worker memory on Python 3.14. Defer the cadwyn import behind a cached bundle accessor and into _versioned_class so importing the schema package no longer loads FastAPI. The foreign-SDK path is unchanged; a regression test asserts the worker import graph stays cadwyn-free. --- .../sdk/execution_time/schema/__init__.py | 13 ++++++- .../sdk/execution_time/schema/migrator.py | 9 +++-- .../schema/versions/__init__.py | 36 ++++++++++++++++--- .../execution_time/schema/test_migrator.py | 35 ++++++++++++++++++ 4 files changed, 85 insertions(+), 8 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/__init__.py b/task-sdk/src/airflow/sdk/execution_time/schema/__init__.py index 1963e8b8e6614..585bc84a33e2d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/__init__.py +++ b/task-sdk/src/airflow/sdk/execution_time/schema/__init__.py @@ -52,7 +52,6 @@ SchemaVersionMigrator, get_schema_version_migrator, ) -from airflow.sdk.execution_time.schema.versions import bundle if TYPE_CHECKING: from collections.abc import Iterable @@ -119,6 +118,18 @@ def resolve_body_class(body: Any) -> type[BaseModel] | None: return registered_models_by_name().get(name) +def __getattr__(name: str) -> Any: + # Re-export ``bundle`` lazily so importing this package does not import ``cadwyn`` (-> FastAPI/ + # Starlette) until something actually accesses ``schema.bundle``. The Task SDK supervisor imports + # this package on every worker, but only the foreign-language-SDK migration path touches the + # bundle, so a pure-Python worker never pays the cadwyn import. + if name == "bundle": + from airflow.sdk.execution_time.schema.versions import get_bundle + + return get_bundle() + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + __all__ = [ "SchemaVersionMigrator", "bundle", diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/migrator.py b/task-sdk/src/airflow/sdk/execution_time/schema/migrator.py index 9560f6de41905..d2a7a9e305f06 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/migrator.py +++ b/task-sdk/src/airflow/sdk/execution_time/schema/migrator.py @@ -33,7 +33,6 @@ from typing import TYPE_CHECKING, Any, cast import attrs -from cadwyn import generate_versioned_models if TYPE_CHECKING: from cadwyn import VersionBundle @@ -103,6 +102,11 @@ class SchemaVersionMigrator: def _versioned_class(self, version: str, model: type[BaseModel]) -> type[BaseModel]: """Get the Cadwyn-generated class for *model* at *version*.""" + # Imported here, not at module scope, so importing this module (and the parent ``schema`` + # package the Task SDK supervisor pulls in) does not import ``cadwyn`` -> FastAPI/Starlette. + # Only the foreign-language-SDK migration path reaches this; a pure-Python worker never does. + from cadwyn import generate_versioned_models + if self._versioned_models is None: self._versioned_models = generate_versioned_models(self._bundle) return self._versioned_models[version][model] @@ -207,8 +211,9 @@ def get_schema_version_migrator() -> SchemaVersionMigrator: no per-call state, so concurrent callers can share a single instance safely. """ - from airflow.sdk.execution_time.schema.versions import bundle + from airflow.sdk.execution_time.schema.versions import get_bundle + bundle = get_bundle() return SchemaVersionMigrator(bundle=bundle, supervisor_version=bundle.versions[0].value) diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py b/task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py index d8ae4e4580ed5..9491a8993fdc3 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py +++ b/task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py @@ -17,9 +17,35 @@ from __future__ import annotations -from cadwyn import HeadVersion, Version, VersionBundle +import functools +from typing import TYPE_CHECKING, Any -bundle = VersionBundle( - HeadVersion(), - Version("2026-06-16"), -) +if TYPE_CHECKING: + from cadwyn import VersionBundle + + +@functools.cache +def get_bundle() -> VersionBundle: + """ + Build the supervisor schema ``VersionBundle`` lazily. + + The ``cadwyn`` import is deferred to here so importing this module -- and the parent ``schema`` + package, which the Task SDK supervisor pulls in on every worker -- does not import ``cadwyn``, + which drags in FastAPI/Starlette/Jinja2. The bundle (and its cadwyn machinery) is only needed on + the foreign-language-SDK migration path; a pure-Python worker never builds it. Cached so the + bundle is constructed once per process. + """ + from cadwyn import HeadVersion, Version, VersionBundle + + return VersionBundle( + HeadVersion(), + Version("2026-06-16"), + ) + + +def __getattr__(name: str) -> Any: + # Keep ``from ...versions import bundle`` working for existing callers without forcing the + # cadwyn import at module load (only resolves the bundle when ``bundle`` is actually accessed). + if name == "bundle": + return get_bundle() + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/task-sdk/tests/task_sdk/execution_time/schema/test_migrator.py b/task-sdk/tests/task_sdk/execution_time/schema/test_migrator.py index e051d046078ce..05218aded3d3b 100644 --- a/task-sdk/tests/task_sdk/execution_time/schema/test_migrator.py +++ b/task-sdk/tests/task_sdk/execution_time/schema/test_migrator.py @@ -28,6 +28,8 @@ from __future__ import annotations +import subprocess +import sys from typing import Literal import pytest @@ -334,3 +336,36 @@ def test_known_discriminator_returns_head_class(self, mock_registry): def test_extra_fields_in_body_do_not_affect_resolution(self, mock_registry): body = {"type": "_LangSdkRequest", "ti_id": "t1", "field_a": 7} assert resolve_body_class(body) is _LangSdkRequest + + +class TestLazyCadwynImport: + """``cadwyn`` (which imports FastAPI/Starlette/Jinja2) must stay off the worker import path. + + The Task SDK supervisor imports the schema package on every Celery pool worker, but ``cadwyn`` is + only needed on the foreign-language-SDK migration path, so its import is deferred. Each check runs + in a fresh interpreter because ``sys.modules`` is process-global and other tests import cadwyn. + """ + + @pytest.mark.parametrize( + "module", + [ + "airflow.sdk.execution_time.schema", + "airflow.sdk.execution_time.supervisor", + ], + ) + def test_importing_worker_path_does_not_load_cadwyn(self, module): + code = ( + f"import sys; import {module}; " + "assert 'cadwyn' not in sys.modules, sorted(m for m in sys.modules if 'cadwyn' in m); " + "assert 'fastapi' not in sys.modules, 'fastapi was imported'" + ) + subprocess.run([sys.executable, "-c", code], check=True, capture_output=True, text=True) + + def test_accessing_bundle_loads_cadwyn(self): + # The foreign-SDK migration path does need cadwyn; accessing the bundle is where it loads. + code = ( + "import sys; from airflow.sdk.execution_time.schema import bundle; " + "assert bundle.versions, 'bundle should be built'; " + "assert 'cadwyn' in sys.modules, 'cadwyn should load when the bundle is accessed'" + ) + subprocess.run([sys.executable, "-c", code], check=True, capture_output=True, text=True)