Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
SchemaVersionMigrator,
get_schema_version_migrator,
)
from airflow.sdk.execution_time.schema.versions import bundle

if TYPE_CHECKING:
from collections.abc import Iterable
Expand Down Expand Up @@ -119,6 +118,18 @@ def resolve_body_class(body: Any) -> type[BaseModel] | None:
return registered_models_by_name().get(name)


def __getattr__(name: str) -> Any:
# Re-export ``bundle`` lazily so importing this package does not import ``cadwyn`` (-> FastAPI/
# Starlette) until something actually accesses ``schema.bundle``. The Task SDK supervisor imports
# this package on every worker, but only the foreign-language-SDK migration path touches the
# bundle, so a pure-Python worker never pays the cadwyn import.
if name == "bundle":
from airflow.sdk.execution_time.schema.versions import get_bundle

return get_bundle()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = [
"SchemaVersionMigrator",
"bundle",
Expand Down
9 changes: 7 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/schema/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from typing import TYPE_CHECKING, Any, cast

import attrs
from cadwyn import generate_versioned_models

if TYPE_CHECKING:
from cadwyn import VersionBundle
Expand Down Expand Up @@ -103,6 +102,11 @@ class SchemaVersionMigrator:

def _versioned_class(self, version: str, model: type[BaseModel]) -> type[BaseModel]:
"""Get the Cadwyn-generated class for *model* at *version*."""
# Imported here, not at module scope, so importing this module (and the parent ``schema``
# package the Task SDK supervisor pulls in) does not import ``cadwyn`` -> FastAPI/Starlette.
# Only the foreign-language-SDK migration path reaches this; a pure-Python worker never does.
from cadwyn import generate_versioned_models

if self._versioned_models is None:
self._versioned_models = generate_versioned_models(self._bundle)
return self._versioned_models[version][model]
Expand Down Expand Up @@ -207,8 +211,9 @@ def get_schema_version_migrator() -> SchemaVersionMigrator:
no per-call state, so concurrent callers can share a single
instance safely.
"""
from airflow.sdk.execution_time.schema.versions import bundle
from airflow.sdk.execution_time.schema.versions import get_bundle

bundle = get_bundle()
return SchemaVersionMigrator(bundle=bundle, supervisor_version=bundle.versions[0].value)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,35 @@

from __future__ import annotations

from cadwyn import HeadVersion, Version, VersionBundle
import functools
from typing import TYPE_CHECKING, Any

bundle = VersionBundle(
HeadVersion(),
Version("2026-06-16"),
)
if TYPE_CHECKING:
from cadwyn import VersionBundle


@functools.cache
def get_bundle() -> VersionBundle:
"""
Build the supervisor schema ``VersionBundle`` lazily.
The ``cadwyn`` import is deferred to here so importing this module -- and the parent ``schema``
package, which the Task SDK supervisor pulls in on every worker -- does not import ``cadwyn``,
which drags in FastAPI/Starlette/Jinja2. The bundle (and its cadwyn machinery) is only needed on
the foreign-language-SDK migration path; a pure-Python worker never builds it. Cached so the
bundle is constructed once per process.
"""
from cadwyn import HeadVersion, Version, VersionBundle

return VersionBundle(
HeadVersion(),
Version("2026-06-16"),
)


def __getattr__(name: str) -> Any:
# Keep ``from ...versions import bundle`` working for existing callers without forcing the
# cadwyn import at module load (only resolves the bundle when ``bundle`` is actually accessed).
if name == "bundle":
return get_bundle()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
35 changes: 35 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/schema/test_migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from __future__ import annotations

import subprocess
import sys
from typing import Literal

import pytest
Expand Down Expand Up @@ -334,3 +336,36 @@ def test_known_discriminator_returns_head_class(self, mock_registry):
def test_extra_fields_in_body_do_not_affect_resolution(self, mock_registry):
body = {"type": "_LangSdkRequest", "ti_id": "t1", "field_a": 7}
assert resolve_body_class(body) is _LangSdkRequest


class TestLazyCadwynImport:
"""``cadwyn`` (which imports FastAPI/Starlette/Jinja2) must stay off the worker import path.
The Task SDK supervisor imports the schema package on every Celery pool worker, but ``cadwyn`` is
only needed on the foreign-language-SDK migration path, so its import is deferred. Each check runs
in a fresh interpreter because ``sys.modules`` is process-global and other tests import cadwyn.
"""

@pytest.mark.parametrize(
"module",
[
"airflow.sdk.execution_time.schema",
"airflow.sdk.execution_time.supervisor",
],
)
def test_importing_worker_path_does_not_load_cadwyn(self, module):
code = (
f"import sys; import {module}; "
"assert 'cadwyn' not in sys.modules, sorted(m for m in sys.modules if 'cadwyn' in m); "
"assert 'fastapi' not in sys.modules, 'fastapi was imported'"
)
subprocess.run([sys.executable, "-c", code], check=True, capture_output=True, text=True)

def test_accessing_bundle_loads_cadwyn(self):
# The foreign-SDK migration path does need cadwyn; accessing the bundle is where it loads.
code = (
"import sys; from airflow.sdk.execution_time.schema import bundle; "
"assert bundle.versions, 'bundle should be built'; "
"assert 'cadwyn' in sys.modules, 'cadwyn should load when the bundle is accessed'"
)
subprocess.run([sys.executable, "-c", code], check=True, capture_output=True, text=True)
Loading