From bebe0120b58079366f889f26b097d556bb503c8a Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 08:53:48 -0400 Subject: [PATCH 1/9] fix/issue-66373: Adding back queued_dttm --- .../execution_api/datamodels/taskinstance.py | 3 + .../execution_api/routes/task_instances.py | 3 + .../execution_api/versions/__init__.py | 2 + .../execution_api/versions/v2026_06_30.py | 17 ++- .../v2026_06_30/test_task_instances.py | 106 ++++++++++++++++++ .../airflow/sdk/api/datamodels/_generated.py | 1 + .../airflow/sdk/execution_time/task_runner.py | 4 + task-sdk/src/airflow/sdk/types.py | 1 + 8 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index c7f6b1ee9f8a9..9610473a930e1 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -429,6 +429,9 @@ class TIRunContext(BaseModel): always reflects when the task *first* started, not when it was rescheduled/resumed. """ + queued_dttm: UtcDateTime | None = None + """When the task was queued. Used by listeners to measure queue wait time.""" + class PrevSuccessfulDagRunResponse(BaseModel): """Schema for response with previous successful DagRun information for Task Template Context.""" diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 8a91614e09aeb..deaed56b85bfd 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -158,6 +158,7 @@ def ti_run( TI.try_number, TI.max_tries, TI.start_date, + TI.queued_dttm, TI.next_method, TI.hostname, TI.unixname, @@ -314,6 +315,8 @@ def ti_run( context.next_method = ti.next_method context.next_kwargs = ti.next_kwargs context.start_date = ti.start_date + if ti.queued_dttm: + context.queued_dttm = ti.queued_dttm except SQLAlchemyError: log.exception("Error marking Task Instance state as running") raise HTTPException( diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 9e4d486aa3028..f851b76867424 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -50,6 +50,7 @@ AddAwaitingInputStatePayload, AddConnectionTestEndpoint, AddVariableKeysEndpoint, + AddQueuedDttmField, ) bundle = VersionBundle( @@ -59,6 +60,7 @@ AddVariableKeysEndpoint, AddConnectionTestEndpoint, AddAwaitingInputStatePayload, + AddQueuedDttmField, ), Version( "2026-06-16", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index f9c22f6cd8281..903b82a048f6e 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -17,9 +17,9 @@ from __future__ import annotations -from cadwyn import VersionChange, endpoint, schema +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema -from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIAwaitingInputStatePayload +from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIAwaitingInputStatePayload, TIRunContext class AddVariableKeysEndpoint(VersionChange): @@ -53,3 +53,16 @@ class AddAwaitingInputStatePayload(VersionChange): schema(TIAwaitingInputStatePayload).field("next_kwargs").didnt_exist, schema(TIAwaitingInputStatePayload).field("rendered_map_index").didnt_exist, ) + + +class AddQueuedDttmField(VersionChange): + """Add ``queued_dttm`` field to TIRunContext.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(TIRunContext).field("queued_dttm").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_queued_dttm_field(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove queued_dttm field for older API versions.""" + response.body.pop("queued_dttm", None) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py new file mode 100644 index 0000000000000..145c8e0f020f3 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow._shared.timezones import timezone +from airflow.utils.state import DagRunState, State + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + +TIMESTAMP_STR = "2024-09-30T12:00:00Z" +TIMESTAMP = timezone.parse(TIMESTAMP_STR) + +RUN_PATCH_BODY = { + "state": "running", + "hostname": "h", + "unixname": "u", + "pid": 1, + "start_date": TIMESTAMP_STR, +} + + +@pytest.fixture +def old_ver_client(client): + """Execution API version immediately before ``queued_dttm`` was added.""" + client.headers["Airflow-API-Version"] = "2026-06-16" + return client + + +class TestQueuedDttmFieldBackwardCompat: + @pytest.fixture(autouse=True) + def _freeze_time(self, time_machine): + time_machine.move_to(TIMESTAMP_STR, tick=False) + + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_old_version_strips_queued_dttm(self, old_ver_client, session, create_task_instance): + ti = create_task_instance( + task_id="test_queued_dttm_downgrade", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.queued_dttm = TIMESTAMP + session.commit() + + response = old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + + assert response.status_code == 200 + assert "queued_dttm" not in response.json() + + def test_head_version_includes_queued_dttm_when_set(self, client, session, create_task_instance): + queued_at = timezone.parse("2024-09-30T11:55:00Z") + ti = create_task_instance( + task_id="test_queued_dttm_head", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.queued_dttm = queued_at + session.commit() + + response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + + assert response.status_code == 200 + assert response.json()["queued_dttm"] == "2024-09-30T11:55:00Z" + + def test_head_version_omits_queued_dttm_when_not_set(self, client, session, create_task_instance): + ti = create_task_instance( + task_id="test_queued_dttm_head_null", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.queued_dttm = None + session.commit() + + response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + + assert response.status_code == 200 + assert "queued_dttm" not in response.json() diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 9bdd8ac4fba61..6f29b84cb628c 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -793,3 +793,4 @@ class TIRunContext(BaseModel): xcom_keys_to_clear: Annotated[list[str] | None, Field(title="Xcom Keys To Clear")] = None should_retry: Annotated[bool | None, Field(title="Should Retry")] = False start_date: Annotated[AwareDatetime | None, Field(title="Start Date")] = None + queued_dttm: Annotated[AwareDatetime | None, Field(title="Queued Dttm")] = None diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 48822c52f5d9b..437bae7896a46 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -240,6 +240,9 @@ class RuntimeTaskInstance(TaskInstance): start_date: AwareDatetime """Start date of the task instance.""" + queued_dttm: AwareDatetime | None = None + """When the task was queued. Used to measure queue wait time.""" + end_date: AwareDatetime | None = None state: TaskInstanceState | None = None @@ -961,6 +964,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: _ti_context_from_server=what.ti_context, max_tries=what.ti_context.max_tries, start_date=what.start_date, + queued_dttm=what.ti_context.queued_dttm, state=TaskInstanceState.RUNNING, sentry_integration=what.sentry_integration, ) diff --git a/task-sdk/src/airflow/sdk/types.py b/task-sdk/src/airflow/sdk/types.py index 711dbe71ca46c..8c065100cef89 100644 --- a/task-sdk/src/airflow/sdk/types.py +++ b/task-sdk/src/airflow/sdk/types.py @@ -140,6 +140,7 @@ class RuntimeTaskInstanceProtocol(Protocol): max_tries: int hostname: str | None = None start_date: AwareDatetime + queued_dttm: AwareDatetime | None = None end_date: AwareDatetime | None = None state: TaskInstanceState | None = None is_mapped: bool | None = None From ec136d4c409c7d4792c46f1a470baf6f187433e4 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 09:10:19 -0400 Subject: [PATCH 2/9] fix/issue-66373: Improving test coverage --- task-sdk/tests/conftest.py | 3 ++ .../execution_time/test_task_runner.py | 45 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/task-sdk/tests/conftest.py b/task-sdk/tests/conftest.py index c1ef3b72c92f4..17c090ad9f6b1 100644 --- a/task-sdk/tests/conftest.py +++ b/task-sdk/tests/conftest.py @@ -233,6 +233,7 @@ def __call__( should_retry: bool = ..., max_tries: int = ..., consumed_asset_events: Sequence[AssetEventDagRunReference] = ..., + queued_dttm: datetime | None = ..., ) -> TIRunContext: ... @@ -275,6 +276,7 @@ def _make_context( should_retry: bool = False, max_tries: int = 0, consumed_asset_events: Sequence[AssetEventDagRunReference] = (), + queued_dttm: datetime | None = None, ) -> TIRunContext: return TIRunContext( dag_run=DagRun( @@ -294,6 +296,7 @@ def _make_context( task_reschedule_count=task_reschedule_count, max_tries=max_tries, should_retry=should_retry, + queued_dttm=queued_dttm, ) return _make_context diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 1968de749736d..7fe46a277858a 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -246,6 +246,51 @@ def test_parse(test_dags_dir: Path, make_ti_context): assert ti.task.dag +@mock.patch("airflow.sdk.execution_time.task_runner.DagBundlesManager") +@mock.patch("airflow.dag_processing.dagbag.BundleDagBag") +@pytest.mark.parametrize( + "queued_dttm", + [ + pytest.param(timezone.datetime(2024, 12, 1, 0, 55), id="with_queued_dttm"), + pytest.param(None, id="without_queued_dttm"), + ], +) +def test_parse_propagates_queued_dttm(mock_dagbag, mock_bundle_manager, make_ti_context, queued_dttm): + """queued_dttm from TIRunContext must land on RuntimeTaskInstance.""" + mock_bundle = mock.Mock() + mock_bundle.path = Path("/tmp") + mock_bundle_manager.return_value.get_bundle.return_value = mock_bundle + mock_bag_instance = mock.Mock() + mock_dagbag.return_value = mock_bag_instance + mock_dag = mock.Mock(spec=DAG) + mock_task = mock.Mock(spec=BaseOperator) + mock_bag_instance.dags = {"super_basic": mock_dag} + mock_dag.task_dict = {"a": mock_task} + + what = StartupDetails( + ti=TaskInstanceDTO( + id=uuid7(), + task_id="a", + dag_id="super_basic", + run_id="c", + try_number=1, + dag_version_id=uuid7(), + pool_slots=1, + queue="default", + priority_weight=1, + ), + dag_rel_path="super_basic.py", + bundle_info=BundleInfo(name="my-bundle", version=None), + ti_context=make_ti_context(queued_dttm=queued_dttm), + start_date=timezone.utcnow(), + sentry_integration="", + ) + + ti = parse(what, mock.Mock()) + + assert ti.queued_dttm == queued_dttm + + @mock.patch("airflow.dag_processing.dagbag.BundleDagBag") def test_parse_dag_bag(mock_dagbag, test_dags_dir: Path, make_ti_context): """Test that checks that the BundleDagBag is constructed as expected during parsing""" From 5586122aae17f55665c63f3da21ba14d37db4744 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 09:55:45 -0400 Subject: [PATCH 3/9] fix/issue-66373: Fixing failing unit test --- providers/openlineage/tests/unit/openlineage/utils/test_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 36fe5125a7f0f..e8d1704f933a8 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -3072,6 +3072,7 @@ def test_taskinstance_info_af3(): "try_number": 1, "dag_bundle_version": "bundle_version", "dag_bundle_name": "bundle_name", + "queued_dttm": None } runtime_ti.rendered_map_index = "country=PL" From 4d02fb634858f0348a242ffae5eae73a1b1e9ff0 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 11:52:17 -0400 Subject: [PATCH 4/9] fix/issue-66373: Updating after static/test failures --- .../tests/unit/openlineage/utils/test_utils.py | 2 +- .../airflow/sdk/execution_time/schema/schema.json | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index e8d1704f933a8..6d3f58d0172e7 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -3072,7 +3072,7 @@ def test_taskinstance_info_af3(): "try_number": 1, "dag_bundle_version": "bundle_version", "dag_bundle_name": "bundle_name", - "queued_dttm": None + "queued_dttm": None, } runtime_ti.rendered_map_index = "country=PL" diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json index c02aa76791ed4..67f3eebc58828 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json +++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json @@ -4921,6 +4921,19 @@ ], "default": null, "title": "Start Date" + }, + "queued_dttm": { + "anyOf": [ + { + "format": "date-time", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Queued Dttm" } }, "required": [ From 60ac6af6f9b5fe706bef8b6edaf5b50960a386e5 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 15:07:40 -0400 Subject: [PATCH 5/9] fix/issue-66373: Fixing test behavior --- .../openlineage/tests/unit/openlineage/utils/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 6d3f58d0172e7..2f1577e04e658 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -3072,7 +3072,7 @@ def test_taskinstance_info_af3(): "try_number": 1, "dag_bundle_version": "bundle_version", "dag_bundle_name": "bundle_name", - "queued_dttm": None, + # "queued_dttm": None, # Removing due to test failures in CI (despite logic properly passing locally) } runtime_ti.rendered_map_index = "country=PL" From 96afbede8f715ea77d6864efbff585e2bc177472 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 15:43:09 -0400 Subject: [PATCH 6/9] fix/issue-66373: Fixing test behavior --- .../openlineage/tests/unit/openlineage/utils/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 2f1577e04e658..6d3f58d0172e7 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -3072,7 +3072,7 @@ def test_taskinstance_info_af3(): "try_number": 1, "dag_bundle_version": "bundle_version", "dag_bundle_name": "bundle_name", - # "queued_dttm": None, # Removing due to test failures in CI (despite logic properly passing locally) + "queued_dttm": None, } runtime_ti.rendered_map_index = "country=PL" From 6dda87efc20de4d78f459589f0c257ca86116680 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 29 May 2026 17:11:35 -0400 Subject: [PATCH 7/9] fix/issue-66373: Gating test behavior --- .../tests/unit/openlineage/utils/test_utils.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 6d3f58d0172e7..1fbe7d81b9cf3 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -92,6 +92,7 @@ AIRFLOW_V_3_0_3_PLUS, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS, + AIRFLOW_V_3_3_PLUS, ) BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash" @@ -3065,16 +3066,21 @@ def test_taskinstance_info_af3(): bundle_instance.name = "bundle_name" runtime_ti.bundle_instance = bundle_instance - assert dict(TaskInstanceInfo(runtime_ti)) == { + expected: dict = { "log_url": runtime_ti.log_url, "map_index": 2, "rendered_map_index": None, "try_number": 1, "dag_bundle_version": "bundle_version", "dag_bundle_name": "bundle_name", - "queued_dttm": None, } + if AIRFLOW_V_3_3_PLUS: + # queued_dttm was added to RuntimeTaskInstance in 3.3.0 + expected["queued_dttm"] = None + + assert dict(TaskInstanceInfo(runtime_ti)) == expected + runtime_ti.rendered_map_index = "country=PL" assert dict(TaskInstanceInfo(runtime_ti))["rendered_map_index"] == "country=PL" From e484cfc1fd378d4a04f66072890b0c16234c46d7 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Wed, 10 Jun 2026 15:45:31 -0400 Subject: [PATCH 8/9] fix/issue-66373: Fixing CI failures --- .../execution_api/versions/v2026_06_30.py | 5 ++++- .../v2026_06_30/test_task_instances.py | 6 +++--- generated/provider_dependencies.json | 3 +++ .../provider_dependencies.json.sha256sum | 2 +- .../execution_time/test_task_runner.py | 19 ++++++++++++++----- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index 903b82a048f6e..76be8e9f79096 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -19,7 +19,10 @@ from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema -from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIAwaitingInputStatePayload, TIRunContext +from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( + TIAwaitingInputStatePayload, + TIRunContext, +) class AddVariableKeysEndpoint(VersionChange): diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py index 145c8e0f020f3..af11c7000faa2 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py @@ -26,7 +26,7 @@ pytestmark = pytest.mark.db_test -TIMESTAMP_STR = "2024-09-30T12:00:00Z" +TIMESTAMP_STR = "2026-01-01T00:00:00Z" TIMESTAMP = timezone.parse(TIMESTAMP_STR) RUN_PATCH_BODY = { @@ -73,7 +73,7 @@ def test_old_version_strips_queued_dttm(self, old_ver_client, session, create_ta assert "queued_dttm" not in response.json() def test_head_version_includes_queued_dttm_when_set(self, client, session, create_task_instance): - queued_at = timezone.parse("2024-09-30T11:55:00Z") + queued_at = timezone.parse("2026-01-01T00:05:00Z") ti = create_task_instance( task_id="test_queued_dttm_head", state=State.QUEUED, @@ -87,7 +87,7 @@ def test_head_version_includes_queued_dttm_when_set(self, client, session, creat response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) assert response.status_code == 200 - assert response.json()["queued_dttm"] == "2024-09-30T11:55:00Z" + assert response.json()["queued_dttm"] == "2026-01-01T00:05:00Z" def test_head_version_omits_queued_dttm_when_not_set(self, client, session, create_task_instance): ti = create_task_instance( diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index e97e830bbd0a9..296bba3c24ab3 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1013,6 +1013,7 @@ "http", "microsoft.azure", "microsoft.mssql", + "mongo", "mysql", "openlineage", "oracle", @@ -1253,6 +1254,8 @@ "amazon", "common.compat", "common.messaging", + "google", + "openlineage", "oracle", "sftp" ], diff --git a/generated/provider_dependencies.json.sha256sum b/generated/provider_dependencies.json.sha256sum index 49ab0db88c08e..659660a6a7bf5 100644 --- a/generated/provider_dependencies.json.sha256sum +++ b/generated/provider_dependencies.json.sha256sum @@ -1 +1 @@ -f042436099826662d45d5f59c100a363d5e12facd51a7c8b850ccbce08d8c4ee +c2a0259b8dbc5d60fdf336b2dbc8ee4860bc94313f620fb70b1d21e8a612072b diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 7fe46a277858a..20a67f619ea41 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -251,21 +251,30 @@ def test_parse(test_dags_dir: Path, make_ti_context): @pytest.mark.parametrize( "queued_dttm", [ - pytest.param(timezone.datetime(2024, 12, 1, 0, 55), id="with_queued_dttm"), + pytest.param(timezone.datetime(2026, 1, 1, 0, 0), id="with_queued_dttm"), pytest.param(None, id="without_queued_dttm"), ], ) def test_parse_propagates_queued_dttm(mock_dagbag, mock_bundle_manager, make_ti_context, queued_dttm): """queued_dttm from TIRunContext must land on RuntimeTaskInstance.""" + # Mock the bundle mock_bundle = mock.Mock() mock_bundle.path = Path("/tmp") mock_bundle_manager.return_value.get_bundle.return_value = mock_bundle - mock_bag_instance = mock.Mock() - mock_dagbag.return_value = mock_bag_instance - mock_dag = mock.Mock(spec=DAG) + + # Mock the task to "assign" to the DAG mock_task = mock.Mock(spec=BaseOperator) - mock_bag_instance.dags = {"super_basic": mock_dag} + mock_task.deserialization_allowed_class_fields = () + + # Mock the DAG + mock_dag = mock.Mock(spec=DAG) mock_dag.task_dict = {"a": mock_task} + mock_dag.tasks = [mock_task] + + # Mock the DagBag + mock_bag_instance = mock.Mock() + mock_bag_instance.dags = {"super_basic": mock_dag} + mock_dagbag.return_value = mock_bag_instance what = StartupDetails( ti=TaskInstanceDTO( From 8c9c5dcc571f01ac0db6dcf09feed7badd56d5e0 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Wed, 10 Jun 2026 22:23:14 -0400 Subject: [PATCH 9/9] fix/issue-66373: Fixing CI failures --- .../src/airflow/api_fastapi/execution_api/versions/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index f851b76867424..65eb9c477793f 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -49,8 +49,8 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_30 import ( AddAwaitingInputStatePayload, AddConnectionTestEndpoint, - AddVariableKeysEndpoint, AddQueuedDttmField, + AddVariableKeysEndpoint, ) bundle = VersionBundle(