From ecdcdecfbf5e51b344e7fd0947cdaf3e6de124ea Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:46:56 -0500 Subject: [PATCH 01/11] feat: add note to dagrun init --- airflow-core/src/airflow/models/dagrun.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c5eb3a7087bb7..ca96283d80293 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -328,6 +328,7 @@ def __init__( backfill_id: NonNegativeInt | None = None, bundle_version: str | None = None, partition_key: str | None = None, + note: str | None = None, ): # For manual runs where logical_date is None, ensure no data_interval is set. if logical_date is None and data_interval is not None: @@ -348,6 +349,7 @@ def __init__( self.run_after = run_after self.start_date = start_date self.conf = conf or {} + self.note = note if state is not None: self.state = state if not is_arg_set(queued_at): From 59eee39fbcc010f5bccb8b145bbb622b1d543f36 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:47:44 -0500 Subject: [PATCH 02/11] feat: add note to DagRunTriggerException --- task-sdk/src/airflow/sdk/exceptions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 92185df5cedf6..9f8a5f11fbf8e 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -248,6 +248,7 @@ def __init__( failed_states: list[str], poke_interval: int, deferrable: bool, + note: str | None = None, ): super().__init__() self.trigger_dag_id = trigger_dag_id @@ -261,6 +262,7 @@ def __init__( self.failed_states = failed_states self.poke_interval = poke_interval self.deferrable = deferrable + self.note = note class DownstreamTasksSkipped(AirflowException): From 47e23db9feb00ed65f29f53f8036a55e1386ae98 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:48:53 -0500 Subject: [PATCH 03/11] feat: add note support to TriggerDagRunOperator --- .../providers/standard/operators/trigger_dagrun.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index ae3f978da4349..2ec8c292878dc 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -18,6 +18,7 @@ from __future__ import annotations import datetime +import inspect import json import time from collections.abc import Sequence @@ -179,6 +180,7 @@ def __init__( failed_states: list[str | DagRunState] | None = None, skip_when_already_exists: bool = False, fail_when_dag_is_paused: bool = False, + note: str | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), openlineage_inject_parent_info: bool = True, **kwargs, @@ -201,6 +203,7 @@ def __init__( self.skip_when_already_exists = skip_when_already_exists self.fail_when_dag_is_paused = fail_when_dag_is_paused self.openlineage_inject_parent_info = openlineage_inject_parent_info + self.note = note self.deferrable = deferrable self.logical_date = logical_date if logical_date is NOTSET: @@ -274,7 +277,7 @@ def execute(self, context: Context): def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): from airflow.providers.common.compat.sdk import DagRunTriggerException - raise DagRunTriggerException( + kwargs_accepted = dict( trigger_dag_id=self.trigger_dag_id, dag_run_id=run_id, conf=self.conf, @@ -288,6 +291,13 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): deferrable=self.deferrable, ) + if self.note: + sig = inspect.signature(DagRunTriggerException.__init__) + if "note" in sig.parameters: + kwargs_accepted["note"] = self.note + + raise DagRunTriggerException(**kwargs_accepted) + def _trigger_dag_af_2(self, context, run_id, parsed_logical_date): try: dag_run = trigger_dag( From 102bca8c853d3486bec5701b1ca52b82ef21f4db Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:49:49 -0500 Subject: [PATCH 04/11] feat: add support to task_runner and supervisor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 6 +----- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 1 + 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index b87131aa7336d..b0878f766e2af 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1371,11 +1371,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: dump_opts = {"exclude_unset": True} elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( - msg.dag_id, - msg.run_id, - msg.conf, - msg.logical_date, - msg.reset_dag_run, + msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.reset_dag_run, msg.note ) elif isinstance(msg, GetDagRun): dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9fa13a08d36a0..3e2dbf727e894 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1344,6 +1344,7 @@ def _handle_trigger_dag_run( logical_date=drte.logical_date, conf=drte.conf, reset_dag_run=drte.reset_dag_run, + note=drte.note, ), ) From 3bd77514985658413cda07f34b3f0fcdb1c9c425 Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:55:13 -0500 Subject: [PATCH 05/11] feat: add note TriggerDAGRunPayload # Conflicts: # airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py # task-sdk/src/airflow/sdk/api/datamodels/_generated.py --- .../src/airflow/api_fastapi/execution_api/datamodels/dagrun.py | 1 + task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py index 32b0b7fead07b..8619901f717f7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py @@ -31,6 +31,7 @@ class TriggerDAGRunPayload(StrictBaseModel): conf: dict = Field(default_factory=dict) reset_dag_run: bool = False partition_key: str | None = None + note: str | None = None class DagRunStateResponse(BaseModel): diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 3c2099ffbf5b0..1e984796328de 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -356,6 +356,7 @@ class TriggerDAGRunPayload(BaseModel): conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False partition_key: Annotated[str | None, Field(title="Partition Key")] = None + note: Annotated[str | None, Field(title="Note")] = None class UpdateHITLDetailPayload(BaseModel): From 28b3611699408692146c77a03b38c87ba896570c Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Wed, 28 Jan 2026 03:56:42 -0500 Subject: [PATCH 06/11] feat: handle note info ^ Conflicts: ^ airflow-core/src/airflow/api/common/trigger_dag.py ^ airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py --- airflow-core/src/airflow/api/common/trigger_dag.py | 4 ++++ .../src/airflow/api_fastapi/execution_api/routes/dag_runs.py | 1 + airflow-core/src/airflow/serialization/definitions/dag.py | 4 ++++ task-sdk/src/airflow/sdk/api/client.py | 5 ++++- 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py b/airflow-core/src/airflow/api/common/trigger_dag.py index 77912f58d2637..5b21d4a8c25a2 100644 --- a/airflow-core/src/airflow/api/common/trigger_dag.py +++ b/airflow-core/src/airflow/api/common/trigger_dag.py @@ -50,6 +50,7 @@ def _trigger_dag( conf: dict | str | None = None, logical_date: datetime | None = None, replace_microseconds: bool = True, + note: str | None = None, partition_key: str | None = None, session: Session = NEW_SESSION, ) -> DagRun | None: @@ -118,6 +119,7 @@ def _trigger_dag( run_type=DagRunType.MANUAL, triggered_by=triggered_by, triggering_user_name=triggering_user_name, + note=note, state=DagRunState.QUEUED, partition_key=partition_key, session=session, @@ -137,6 +139,7 @@ def trigger_dag( conf: dict | str | None = None, logical_date: datetime | None = None, replace_microseconds: bool = True, + note: str | None = None, partition_key: str | None = None, session: Session = NEW_SESSION, ) -> DagRun | None: @@ -169,6 +172,7 @@ def trigger_dag( replace_microseconds=replace_microseconds, triggered_by=triggered_by, triggering_user_name=triggering_user_name, + note=note, partition_key=partition_key, session=session, ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index f0af063b76fb7..028409c241b35 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -123,6 +123,7 @@ def trigger_dag_run( triggered_by=DagRunTriggeredByType.OPERATOR, replace_microseconds=False, partition_key=payload.partition_key, + note=payload.note, session=session, ) except DagRunAlreadyExists: diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index b1a2dc8da594a..ac83aac52dc23 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -500,6 +500,7 @@ def create_dagrun( creating_job_id: int | None = None, backfill_id: NonNegativeInt | None = None, partition_key: str | None = None, + note: str | None = None, session: Session = NEW_SESSION, ) -> DagRun: """ @@ -583,6 +584,7 @@ def create_dagrun( triggered_by=triggered_by, triggering_user_name=triggering_user_name, partition_key=partition_key, + note=note, session=session, ) @@ -1111,6 +1113,7 @@ def _create_orm_dagrun( triggered_by: DagRunTriggeredByType, triggering_user_name: str | None = None, partition_key: str | None = None, + note: str | None = None, session: Session = NEW_SESSION, ) -> DagRun: bundle_version = None @@ -1138,6 +1141,7 @@ def _create_orm_dagrun( backfill_id=backfill_id, bundle_version=bundle_version, partition_key=partition_key, + note=note, ) # Load defaults into the following two fields to ensure result can be serialized detached max_log_template_id = session.scalar(select(func.max(LogTemplate.__table__.c.id))) diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 09cd6520ebbb7..e95a624da0b33 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -695,9 +695,12 @@ def trigger( conf: dict | None = None, logical_date: datetime | None = None, reset_dag_run: bool = False, + note: str | None = None, ) -> OKResponse | ErrorResponse: """Trigger a Dag run via the API server.""" - body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run) + body = TriggerDAGRunPayload( + logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run, note=note + ) try: self.client.post( From 278d32c9f1cf9d9bf79a01204030aa8453e0152f Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Fri, 30 Jan 2026 03:27:17 -0500 Subject: [PATCH 07/11] feat: add log warning to _trigger_dag_af_2 --- .../providers/standard/operators/trigger_dagrun.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 2ec8c292878dc..26b990f9c9e3e 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -291,15 +291,16 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): deferrable=self.deferrable, ) - if self.note: - sig = inspect.signature(DagRunTriggerException.__init__) - if "note" in sig.parameters: - kwargs_accepted["note"] = self.note + if self.note and "note" in inspect.signature(DagRunTriggerException.__init__).parameters: + kwargs_accepted["note"] = self.note raise DagRunTriggerException(**kwargs_accepted) def _trigger_dag_af_2(self, context, run_id, parsed_logical_date): try: + if self.note: + self.log.warning("Parameter 'note' is not supported in Airflow 2.x and will be ignored.") + dag_run = trigger_dag( dag_id=self.trigger_dag_id, run_id=run_id, From fe19338047e35a07603cd88755f96582a33e4d5b Mon Sep 17 00:00:00 2001 From: Arnold Lin Date: Sat, 7 Feb 2026 17:15:02 -0500 Subject: [PATCH 08/11] feat:add cadwyn migration for TIRunContext --- .../execution_api/datamodels/taskinstance.py | 1 + .../execution_api/versions/__init__.py | 3 ++- .../execution_api/versions/v2026_03_31.py | 20 ++++++++++++++++++- .../airflow/sdk/api/datamodels/_generated.py | 1 + 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 513a99f6dc996..d0d86ca89818f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -304,6 +304,7 @@ class DagRun(StrictBaseModel): triggering_user_name: str | None = None consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None + note: str | None = None class TIRunContext(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 30d4159f7453c..c9f4a2f9b669f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -34,13 +34,14 @@ MovePreviousRunEndpoint, ) from airflow.api_fastapi.execution_api.versions.v2026_03_31 import ( + AddNoteField, ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, ) bundle = VersionBundle( HeadVersion(), - Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField), + Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, AddNoteField), Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint), Version("2025-11-07", AddPartitionKeyField), Version("2025-11-05", AddTriggeringUserNameField), diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py index 72e193426dac6..e592296cf31f4 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py @@ -21,7 +21,11 @@ from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema -from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIDeferredStatePayload, TIRunContext +from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( + DagRun, + TIDeferredStatePayload, + TIRunContext, +) class ModifyDeferredTaskKwargsToJsonValue(VersionChange): @@ -50,3 +54,17 @@ class RemoveUpstreamMapIndexesField(VersionChange): def add_upstream_map_indexes_field(response: ResponseInfo) -> None: # type: ignore[misc] """Add upstream_map_indexes field with None for older API versions.""" response.body["upstream_map_indexes"] = None + + +class AddNoteField(VersionChange): + """Add note parameter to DagRun Model.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("note").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_note_field(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove note field for older API versions.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("note", None) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 1e984796328de..35b10f6e6d700 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -628,6 +628,7 @@ class DagRun(BaseModel): triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None + note: Annotated[str | None, Field(title="Note")] = None class TIRunContext(BaseModel): From 59f614b128bf32ac402db854ce698bf1604ac04e Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 12 Feb 2026 13:12:19 +0800 Subject: [PATCH 09/11] Fix ci error at Pydantic level instead of SQLA --- .../execution_api/datamodels/taskinstance.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index d0d86ca89818f..b66910923f5d7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -29,6 +29,7 @@ Tag, TypeAdapter, WithJsonSchema, + model_validator, ) from airflow.api_fastapi.common.types import UtcDateTime @@ -306,6 +307,26 @@ class DagRun(StrictBaseModel): partition_key: str | None note: str | None = None + @model_validator(mode="before") + @classmethod + def extract_dag_run_note(cls, data: Any) -> Any: + """Extract note content from dag_run_note relationship to avoid DetachedInstanceError.""" + if isinstance(data, dict): + return data + # For SQLAlchemy model, extract note from relationship while still attached + if hasattr(data, "dag_run_note"): + dag_run_note = data.dag_run_note + note_value = dag_run_note.content if dag_run_note else None + # Convert to dict to avoid further lazy loading issues + values = { + field_name: getattr(data, field_name, None) + for field_name in cls.model_fields + if field_name != "note" + } + values["note"] = note_value + return values + return data + class TIRunContext(BaseModel): """Response schema for TaskInstance run context.""" From 7d28af3e295a5fd1ec007b517747c052112aec96 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 12 Feb 2026 17:38:35 +0800 Subject: [PATCH 10/11] Use sa_inspect to avoid lazy load on detached instance --- .../execution_api/datamodels/taskinstance.py | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index b66910923f5d7..0aa216a31a511 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -20,7 +20,7 @@ from collections.abc import Iterable from datetime import timedelta from enum import Enum -from typing import Annotated, Any, Literal +from typing import TYPE_CHECKING, Annotated, Any, Literal from pydantic import ( AwareDatetime, @@ -45,6 +45,9 @@ ) from airflow.utils.types import DagRunType +if TYPE_CHECKING: + from airflow.models.dagrun import DagRunNote + AwareDatetimeAdapter = TypeAdapter(AwareDatetime) @@ -310,22 +313,36 @@ class DagRun(StrictBaseModel): @model_validator(mode="before") @classmethod def extract_dag_run_note(cls, data: Any) -> Any: - """Extract note content from dag_run_note relationship to avoid DetachedInstanceError.""" + """Extract note content from dag_run_note relationship to avoid DetachedInstanceError when constructing `DagRunContext` or `TIRunContext` data models.""" + from sqlalchemy import inspect as sa_inspect + from sqlalchemy.exc import NoInspectionAvailable + from sqlalchemy.orm.state import InstanceState + if isinstance(data, dict): return data - # For SQLAlchemy model, extract note from relationship while still attached - if hasattr(data, "dag_run_note"): - dag_run_note = data.dag_run_note + + # Check if this is a SQLAlchemy model by looking for the inspection interface + try: + insp: InstanceState = sa_inspect(data) + except NoInspectionAvailable: + # Not a SQLAlchemy object, return as-is for Pydantic to handle + return data + + # Check if dag_run_note is already loaded (avoid lazy load on detached instance) + if "dag_run_note" in insp.dict: + dag_run_note: DagRunNote | None = insp.dict["dag_run_note"] note_value = dag_run_note.content if dag_run_note else None - # Convert to dict to avoid further lazy loading issues - values = { - field_name: getattr(data, field_name, None) - for field_name in cls.model_fields - if field_name != "note" - } - values["note"] = note_value - return values - return data + else: + note_value = None + + # Convert to dict to avoid further lazy loading issues + values = { + field_name: getattr(data, field_name, None) + for field_name in cls.model_fields + if field_name != "note" + } + values["note"] = note_value + return values class TIRunContext(BaseModel): From 63d0095ba664cf5af63051dd6d9a02412ef542a5 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 12 Feb 2026 17:42:31 +0800 Subject: [PATCH 11/11] Respect note association_proxy instead of dulicating same logic --- .../execution_api/datamodels/taskinstance.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index 0aa216a31a511..4c2a76b3a1353 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -20,7 +20,7 @@ from collections.abc import Iterable from datetime import timedelta from enum import Enum -from typing import TYPE_CHECKING, Annotated, Any, Literal +from typing import Annotated, Any, Literal from pydantic import ( AwareDatetime, @@ -45,9 +45,6 @@ ) from airflow.utils.types import DagRunType -if TYPE_CHECKING: - from airflow.models.dagrun import DagRunNote - AwareDatetimeAdapter = TypeAdapter(AwareDatetime) @@ -313,7 +310,7 @@ class DagRun(StrictBaseModel): @model_validator(mode="before") @classmethod def extract_dag_run_note(cls, data: Any) -> Any: - """Extract note content from dag_run_note relationship to avoid DetachedInstanceError when constructing `DagRunContext` or `TIRunContext` data models.""" + """Extract the `note` (`str | None` from `association_proxy("dag_run_note", "content")`) relationship from `DagRun` to prevent `DetachedInstanceError` when constructing `DagRunContext` or `TIRunContext` models.""" from sqlalchemy import inspect as sa_inspect from sqlalchemy.exc import NoInspectionAvailable from sqlalchemy.orm.state import InstanceState @@ -329,9 +326,8 @@ def extract_dag_run_note(cls, data: Any) -> Any: return data # Check if dag_run_note is already loaded (avoid lazy load on detached instance) - if "dag_run_note" in insp.dict: - dag_run_note: DagRunNote | None = insp.dict["dag_run_note"] - note_value = dag_run_note.content if dag_run_note else None + if "note" in insp.dict: + note_value: str = insp.dict["note"] else: note_value = None