From 9e2992e1d5631e1c8e324347a698584903040b8c Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 6 Jun 2025 16:43:13 +0530 Subject: [PATCH 1/2] Make ``dag.test`` consistent with ``airflow dags test`` CLI command The only difference is CLI `airflow dags test` set logical date to current time, if not provided. While we default to `None` for dag.test. This was fine until but users using it for development of scheduled dags and using context will run into `KeyError: 'logical_date'` due to https://github.com/apache/airflow/issues/48412 --- task-sdk/src/airflow/sdk/definitions/dag.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 449e101f7f565..d375aa1ce5dbd 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -54,7 +54,7 @@ from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.sdk.definitions._internal.node import validate_key -from airflow.sdk.definitions._internal.types import NOTSET +from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet from airflow.sdk.definitions.asset import AssetAll, BaseAsset from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.param import DagParam, ParamsDict @@ -1014,7 +1014,7 @@ def _validate_owner_links(self, _, owner_links): def test( self, run_after: datetime | None = None, - logical_date: datetime | None = None, + logical_date: datetime | None = NOTSET, run_conf: dict[str, Any] | None = None, conn_file_path: str | None = None, variable_file_path: str | None = None, @@ -1082,6 +1082,10 @@ def add_logger_if_needed(ti: TaskInstance): with exit_stack: self.validate() + + # Allow users to explicitly pass None. If it isn't set, we default to current time. + logical_date = logical_date if not isinstance(logical_date, ArgNotSet) else timezone.utcnow() + log.debug("Clearing existing task instances for logical date %s", logical_date) # TODO: Replace with calling client.dag_run.clear in Execution API at some point SchedulerDAG.clear_dags( From 01a75476a69094c98c9cb3fb46cb265bf8578854 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 6 Jun 2025 17:21:49 +0530 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Amogh Desai --- task-sdk/src/airflow/sdk/definitions/dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index d375aa1ce5dbd..87215f5531a71 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1014,7 +1014,7 @@ def _validate_owner_links(self, _, owner_links): def test( self, run_after: datetime | None = None, - logical_date: datetime | None = NOTSET, + logical_date: datetime | None | ArgNotSet = NOTSET, run_conf: dict[str, Any] | None = None, conn_file_path: str | None = None, variable_file_path: str | None = None,