From 047aa8641b60a658d8f0ea5e92d702c96ee25062 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 30 Nov 2024 09:36:02 +0530 Subject: [PATCH 1/4] remove internal_api_call decorator --- airflow/models/dagrun.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e7fa37c82f93a..bb793537ba3ba 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -52,7 +52,6 @@ from sqlalchemy_utils import UUIDType from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import DagCallbackRequest from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound @@ -611,7 +610,6 @@ def generate_run_id(run_type: DagRunType, logical_date: datetime) -> str: return DagRunType(run_type).generate_run_id(logical_date) @staticmethod - @internal_api_call @provide_session def fetch_task_instances( dag_id: str | None = None, @@ -648,7 +646,6 @@ def fetch_task_instances( tis = tis.where(TI.task_id.in_(task_ids)) return session.scalars(tis).all() - @internal_api_call def _check_last_n_dagruns_failed(self, dag_id, max_consecutive_failed_dag_runs, session): """Check if last N dags failed.""" dag_runs = ( @@ -735,7 +732,6 @@ def get_task_instance( ) @staticmethod - @internal_api_call @provide_session def fetch_task_instance( dag_id: str, @@ -768,7 +764,6 @@ def get_dag(self) -> DAG: return self.dag @staticmethod - @internal_api_call @provide_session def get_previous_dagrun( dag_run: DagRun | DagRunPydantic, state: DagRunState | None = None, session: Session = NEW_SESSION @@ -789,7 +784,6 @@ def get_previous_dagrun( return session.scalar(select(DagRun).where(*filters).order_by(DagRun.logical_date.desc()).limit(1)) @staticmethod - @internal_api_call @provide_session def get_previous_scheduled_dagrun( dag_run_id: int, @@ -1710,7 +1704,6 @@ def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | L return DagRun._get_log_template(log_template_id=self.log_template_id, session=session) @staticmethod - @internal_api_call @provide_session def _get_log_template( log_template_id: int | None, session: Session = NEW_SESSION From 0a11297cbff109bbf2a4cba7b0ab46322ebf7852 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 30 Nov 2024 18:39:00 +0530 Subject: [PATCH 2/4] remove *Pydantic types --- airflow/models/dagrun.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index bb793537ba3ba..b28d2fbf02fc1 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -85,9 +85,6 @@ from airflow.models.dag import DAG from airflow.models.operator import Operator - from airflow.serialization.pydantic.dag_run import DagRunPydantic - from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic - from airflow.serialization.pydantic.tasklog import LogTemplatePydantic from airflow.typing_compat import Literal from airflow.utils.types import ArgNotSet @@ -716,7 +713,7 @@ def get_task_instance( session: Session = NEW_SESSION, *, map_index: int = -1, - ) -> TI | TaskInstancePydantic | None: + ) -> TI | None: """ Return the task instance specified by task_id for this dag run. @@ -739,7 +736,7 @@ def fetch_task_instance( task_id: str, session: Session = NEW_SESSION, map_index: int = -1, - ) -> TI | TaskInstancePydantic | None: + ) -> TI | None: """ Return the task instance specified by task_id for this dag run. @@ -766,7 +763,7 @@ def get_dag(self) -> DAG: @staticmethod @provide_session def get_previous_dagrun( - dag_run: DagRun | DagRunPydantic, state: DagRunState | None = None, session: Session = NEW_SESSION + dag_run: DagRun, state: DagRunState | None = None, session: Session = NEW_SESSION ) -> DagRun | None: """ Return the previous DagRun, if there is one. @@ -1700,14 +1697,12 @@ def schedule_tis( return count @provide_session - def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic: + def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: return DagRun._get_log_template(log_template_id=self.log_template_id, session=session) @staticmethod @provide_session - def _get_log_template( - log_template_id: int | None, session: Session = NEW_SESSION - ) -> LogTemplate | LogTemplatePydantic: + def _get_log_template(log_template_id: int | None, session: Session = NEW_SESSION) -> LogTemplate: template: LogTemplate | None if log_template_id is None: # DagRun created before LogTemplate introduction. template = session.scalar(select(LogTemplate).order_by(LogTemplate.id).limit(1)) From 022bd229d2ff77894c8e937fa1d9653980114170 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 30 Nov 2024 18:59:37 +0530 Subject: [PATCH 3/4] fix test --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 591f3549bab39..370cf9eb9694b 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1257,7 +1257,7 @@ def _record_task_map_for_downstreams( def _get_previous_dagrun( *, - task_instance: TaskInstance | TaskInstancePydantic, + task_instance: TaskInstance, state: DagRunState | None = None, session: Session | None = None, ) -> DagRun | None: From 44cacd88602805114e2d461b3e1e1200ffd4b9dd Mon Sep 17 00:00:00 2001 From: kalyanr Date: Sat, 30 Nov 2024 19:07:44 +0530 Subject: [PATCH 4/4] fix test --- airflow/models/taskinstance.py | 2 +- airflow/serialization/pydantic/taskinstance.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 370cf9eb9694b..591f3549bab39 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1257,7 +1257,7 @@ def _record_task_map_for_downstreams( def _get_previous_dagrun( *, - task_instance: TaskInstance, + task_instance: TaskInstance | TaskInstancePydantic, state: DagRunState | None = None, session: Session | None = None, ) -> DagRun | None: diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 87d6f48f111b8..431903a8b9fce 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -220,7 +220,7 @@ def xcom_push( session=session, ) - def get_dagrun(self, session: Session | None = None) -> DagRunPydantic: + def get_dagrun(self, session: Session | None = None) -> DagRun: """ Return the DagRun for this TaskInstance.