From 9d9b7bcfdba1798c7cee43bda0aed5d839a2fb6a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sun, 7 Dec 2025 22:11:25 +0100 Subject: [PATCH 1/6] [v3-1-test] Fix inconsistent Dag hashes when template fields contain unordered dicts (#59091) (#59175) Dags using operators with dictionary values in template_fields (such as env_vars) were getting different hashes on each parse, even when the actual dictionary content was unchanged. This happened because serialize_template_field converts dictionaries to string using str(), which preserves insertion order. When dictionary ordering varies between parses (e.g., when env_vars comes from os.environ.copy()), the string representation differs, causing inconsistent hashing. Prevents unnecessary Dag updates and reprocessing when only dictionary ordering differs in template fields. (cherry picked from commit d46a9d120657d7fa0c70477d961120024fecd893) Co-authored-by: Ephraim Anierobi --- .../src/airflow/serialization/helpers.py | 14 +++++ .../tests/unit/models/test_serialized_dag.py | 53 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 949b3cb9c9f09..5f564df6188ed 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -51,6 +51,16 @@ def translate_tuples_to_lists(obj: Any): return {key: translate_tuples_to_lists(value) for key, value in obj.items()} return obj + def sort_dict_recursively(obj: Any) -> Any: + """Recursively sort dictionaries to ensure consistent ordering.""" + if isinstance(obj, dict): + return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())} + if isinstance(obj, list): + return [sort_dict_recursively(item) for item in obj] + if isinstance(obj, tuple): + return tuple(sort_dict_recursively(item) for item in obj) + return obj + max_length = conf.getint("core", "max_templated_field_length") if not is_jsonable(template_field): @@ -70,6 +80,10 @@ def translate_tuples_to_lists(obj: Any): # and need to be converted to lists return template_field template_field = translate_tuples_to_lists(template_field) + # Sort dictionaries recursively to ensure consistent string representation + # This prevents hash inconsistencies when dict ordering varies + if isinstance(template_field, dict): + template_field = sort_dict_recursively(template_field) serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 92446cee2189c..a81ecaf13fcc8 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -545,6 +545,59 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self): assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + def test_hash_method_consistent_with_dict_ordering_in_template_fields(self, dag_maker): + from airflow.sdk.bases.operator import BaseOperator + + class MyCustomOp(BaseOperator): + template_fields = ("env_vars",) + + def __init__(self, *, task_id: str, **kwargs): + super().__init__(task_id=task_id, **kwargs) + self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": "value3"} + + # Create first DAG with env_vars in one order + with dag_maker("test_dag") as dag1: + MyCustomOp(task_id="task1") + + serialized_dag_1 = SerializedDAG.to_dict(dag1) + + # Create second DAG with env_vars in different order + with dag_maker("test_dag") as dag2: + task = MyCustomOp(task_id="task1") + # Recreate dict with different insertion order + task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": "value2"} + + serialized_dag_2 = SerializedDAG.to_dict(dag2) + + # Verify that the original env_vars have different ordering + env_vars_1 = None + env_vars_2 = None + for task in serialized_dag_1["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_1 = task["__var"].get("env_vars") + for task in serialized_dag_2["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_2 = task["__var"].get("env_vars") + + assert env_vars_1 is not None, "serialized_dag_1 should have env_vars" + assert env_vars_2 is not None, "serialized_dag_2 should have env_vars" + # The serialized env_vars should be sorted dicts (or strings if truncated) + # If they're dicts, verify they're sorted; if strings, they should be equal due to sorting + if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict): + # Both should be sorted dictionaries with same content + assert list(env_vars_1.keys()) == sorted(env_vars_1.keys()) + assert list(env_vars_2.keys()) == sorted(env_vars_2.keys()) + assert env_vars_1 == env_vars_2, "Sorted dicts should be equal regardless of original order" + elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str): + # If truncated to strings, they should be equal due to sorting + assert env_vars_1 == env_vars_2, "String representations should be equal due to sorting" + + hash_1 = SDM.hash(serialized_dag_1) + hash_2 = SDM.hash(serialized_dag_2) + + # Hashes should be identical + assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" + def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): """ Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. From 696c6ffb15510548af4c01912c75f3786f47113a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 8 Dec 2025 09:22:09 +0100 Subject: [PATCH 2/6] Revert "Find only relevant up/downstream tis when clearing (#57758) (#58987)" This reverts commit 8918f98665f149a454e14c0a8cc7d2b09474d93c. --- .../core_api/routes/public/task_instances.py | 60 ++--- .../example_dynamic_task_mapping.py | 32 +-- .../src/airflow/models/taskinstance.py | 246 ++++++------------ .../routes/public/test_task_instances.py | 21 -- .../tests/unit/models/test_taskinstance.py | 54 ---- 5 files changed, 105 insertions(+), 308 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 82e7fad3493bb..68764b5456aa6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -729,54 +729,32 @@ def post_clear_task_instances( if future: body.end_date = None - if (task_markers_to_clear := body.task_ids) is not None: - mapped_tasks_tuples = {t for t in task_markers_to_clear if isinstance(t, tuple)} + task_ids = body.task_ids + if task_ids is not None: + tasks = set(task_ids) + mapped_tasks_tuples = set(t for t in tasks if isinstance(t, tuple)) # Unmapped tasks are expressed in their task_ids (without map_indexes) - normal_task_ids = {t for t in task_markers_to_clear if not isinstance(t, tuple)} - - def _collect_relatives(run_id: str, direction: Literal["upstream", "downstream"]) -> None: - from airflow.models.taskinstance import find_relevant_relatives - - relevant_relatives = find_relevant_relatives( - normal_task_ids, - mapped_tasks_tuples, - dag=dag, - run_id=run_id, - direction=direction, - session=session, + unmapped_task_ids = set(t for t in tasks if not isinstance(t, tuple)) + + if upstream or downstream: + mapped_task_ids = set(tid for tid, _ in mapped_tasks_tuples) + relatives = dag.partial_subset( + task_ids=unmapped_task_ids | mapped_task_ids, + include_downstream=downstream, + include_upstream=upstream, + exclude_original=True, ) - normal_task_ids.update(t for t in relevant_relatives if not isinstance(t, tuple)) - mapped_tasks_tuples.update(t for t in relevant_relatives if isinstance(t, tuple)) - - # We can't easily calculate upstream/downstream map indexes when not - # working for a specific dag run. It's possible by looking at the runs - # one by one, but that is both resource-consuming and logically complex. - # So instead we'll just clear all the tis based on task ID and hope - # that's good enough for most cases. - if dag_run_id is None: - if upstream or downstream: - partial_dag = dag.partial_subset( - task_ids=normal_task_ids.union(tid for tid, _ in mapped_tasks_tuples), - include_downstream=downstream, - include_upstream=upstream, - exclude_original=True, - ) - normal_task_ids.update(partial_dag.task_dict) - else: - if upstream: - _collect_relatives(dag_run_id, "upstream") - if downstream: - _collect_relatives(dag_run_id, "downstream") - - task_markers_to_clear = [ - *normal_task_ids, - *((t, m) for t, m in mapped_tasks_tuples if t not in normal_task_ids), + unmapped_task_ids = unmapped_task_ids | set(relatives.task_dict.keys()) + + mapped_tasks_list = [ + (tid, map_id) for tid, map_id in mapped_tasks_tuples if tid not in unmapped_task_ids ] + task_ids = mapped_tasks_list + list(unmapped_task_ids) # Prepare common parameters common_params = { "dry_run": True, - "task_ids": task_markers_to_clear, + "task_ids": task_ids, "session": session, "run_on_latest_version": body.run_on_latest_version, "only_failed": body.only_failed, diff --git a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py index c7b3a02301daa..750c3da1ec17b 100644 --- a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py +++ b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py @@ -22,9 +22,9 @@ # [START example_dynamic_task_mapping] from datetime import datetime -from airflow.sdk import DAG, task, task_group +from airflow.sdk import DAG, task -with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)): +with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag: @task def add_one(x: int): @@ -39,11 +39,8 @@ def sum_it(values): sum_it(added_values) with DAG( - dag_id="example_task_mapping_second_order", - schedule=None, - catchup=False, - start_date=datetime(2022, 3, 4), -): + dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4) +) as dag2: @task def get_nums(): @@ -61,25 +58,4 @@ def add_10(num): _times_2 = times_2.expand(num=_get_nums) add_10.expand(num=_times_2) -with DAG( - dag_id="example_task_group_mapping", - schedule=None, - catchup=False, - start_date=datetime(2022, 3, 4), -): - - @task_group - def op(num): - @task - def add_1(num): - return num + 1 - - @task - def mul_2(num): - return num * 2 - - return mul_2(add_1(num)) - - op.expand(num=[1, 2, 3]) - # [END example_dynamic_task_mapping] diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 06597a24fc93d..216bc9b1bcad6 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -28,7 +28,7 @@ from collections.abc import Collection, Iterable from datetime import timedelta from functools import cache -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any from urllib.parse import quote import attrs @@ -121,7 +121,7 @@ from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.sdk.types import RuntimeTaskInstanceProtocol from airflow.serialization.definitions.taskgroup import SerializedTaskGroup - from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG + from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.context import Context Operator: TypeAlias = MappedOperator | SerializedBaseOperator @@ -2037,16 +2037,87 @@ def get_relevant_upstream_map_indexes( *, session: Session, ) -> int | range | None: + """ + Infer the map indexes of an upstream "relevant" to this ti. + + The bulk of the logic mainly exists to solve the problem described by + the following example, where 'val' must resolve to different values, + depending on where the reference is being used:: + + @task + def this_task(v): # This is self.task. + return v * 2 + + + @task_group + def tg1(inp): + val = upstream(inp) # This is the upstream task. + this_task(val) # When inp is 1, val here should resolve to 2. + return val + + + # This val is the same object returned by tg1. + val = tg1.expand(inp=[1, 2, 3]) + + + @task_group + def tg2(inp): + another_task(inp, val) # val here should resolve to [2, 4, 6]. + + + tg2.expand(inp=["a", "b"]) + + The surrounding mapped task groups of ``upstream`` and ``self.task`` are + inspected to find a common "ancestor". If such an ancestor is found, + we need to return specific map indexes to pull a partial value from + upstream XCom. + + :param upstream: The referenced upstream task. + :param ti_count: The total count of task instance this task was expanded + by the scheduler, i.e. ``expanded_ti_count`` in the template context. + :return: Specific map index or map indexes to pull, or ``None`` if we + want to "whole" return value (i.e. no mapped task groups involved). + """ + from airflow.models.mappedoperator import get_mapped_ti_count + if TYPE_CHECKING: - assert self.task - return _get_relevant_map_indexes( - run_id=self.run_id, - map_index=self.map_index, - ti_count=ti_count, - task=self.task, - relative=upstream, - session=session, - ) + assert self.task is not None + + # This value should never be None since we already know the current task + # is in a mapped task group, and should have been expanded, despite that, + # we need to check that it is not None to satisfy Mypy. + # But this value can be 0 when we expand an empty list, for that it is + # necessary to check that ti_count is not 0 to avoid dividing by 0. + if not ti_count: + return None + + # Find the innermost common mapped task group between the current task + # If the current task and the referenced task does not have a common + # mapped task group, the two are in different task mapping contexts + # (like another_task above), and we should use the "whole" value. + common_ancestor = _find_common_ancestor_mapped_group(self.task, upstream) + if common_ancestor is None: + return None + + # At this point we know the two tasks share a mapped task group, and we + # should use a "partial" value. Let's break down the mapped ti count + # between the ancestor and further expansion happened inside it. + + ancestor_ti_count = get_mapped_ti_count(common_ancestor, self.run_id, session=session) + ancestor_map_index = self.map_index * ancestor_ti_count // ti_count + + # If the task is NOT further expanded inside the common ancestor, we + # only want to reference one single ti. We must walk the actual DAG, + # and "ti_count == ancestor_ti_count" does not work, since the further + # expansion may be of length 1. + if not _is_further_mapped_inside(upstream, common_ancestor): + return ancestor_map_index + + # Otherwise we need a partial aggregation for values from selected task + # instances in the ancestor's expansion context. + further_count = ti_count // ancestor_ti_count + map_index_start = ancestor_map_index * further_count + return range(map_index_start, map_index_start + further_count) def clear_db_references(self, session: Session): """ @@ -2136,159 +2207,6 @@ def _is_further_mapped_inside(operator: Operator, container: SerializedTaskGroup return False -def _get_relevant_map_indexes( - *, - task: Operator, - run_id: str, - map_index: int, - relative: Operator, - ti_count: int | None, - session: Session, -) -> int | range | None: - """ - Infer the map indexes of a relative that's "relevant" to this ti. - - The bulk of the logic mainly exists to solve the problem described by - the following example, where 'val' must resolve to different values, - depending on where the reference is being used:: - - @task - def this_task(v): # This is self.task. - return v * 2 - - - @task_group - def tg1(inp): - val = upstream(inp) # This is the upstream task. - this_task(val) # When inp is 1, val here should resolve to 2. - return val - - - # This val is the same object returned by tg1. - val = tg1.expand(inp=[1, 2, 3]) - - - @task_group - def tg2(inp): - another_task(inp, val) # val here should resolve to [2, 4, 6]. - - - tg2.expand(inp=["a", "b"]) - - The surrounding mapped task groups of ``upstream`` and ``task`` are - inspected to find a common "ancestor". If such an ancestor is found, - we need to return specific map indexes to pull a partial value from - upstream XCom. - - The same logic apply for finding downstream tasks. - - :param task: Current task being inspected. - :param run_id: Current run ID. - :param map_index: Map index of the current task instance. - :param relative: The relative task to find relevant map indexes for. - :param ti_count: The total count of task instance this task was expanded - by the scheduler, i.e. ``expanded_ti_count`` in the template context. - :return: Specific map index or map indexes to pull, or ``None`` if we - want to "whole" return value (i.e. no mapped task groups involved). - """ - from airflow.models.mappedoperator import get_mapped_ti_count - - # This value should never be None since we already know the current task - # is in a mapped task group, and should have been expanded, despite that, - # we need to check that it is not None to satisfy Mypy. - # But this value can be 0 when we expand an empty list, for that it is - # necessary to check that ti_count is not 0 to avoid dividing by 0. - if not ti_count: - return None - - # Find the innermost common mapped task group between the current task - # If the current task and the referenced task does not have a common - # mapped task group, the two are in different task mapping contexts - # (like another_task above), and we should use the "whole" value. - if (common_ancestor := _find_common_ancestor_mapped_group(task, relative)) is None: - return None - - # At this point we know the two tasks share a mapped task group, and we - # should use a "partial" value. Let's break down the mapped ti count - # between the ancestor and further expansion happened inside it. - - ancestor_ti_count = get_mapped_ti_count(common_ancestor, run_id, session=session) - ancestor_map_index = map_index * ancestor_ti_count // ti_count - - # If the task is NOT further expanded inside the common ancestor, we - # only want to reference one single ti. We must walk the actual DAG, - # and "ti_count == ancestor_ti_count" does not work, since the further - # expansion may be of length 1. - if not _is_further_mapped_inside(relative, common_ancestor): - return ancestor_map_index - - # Otherwise we need a partial aggregation for values from selected task - # instances in the ancestor's expansion context. - further_count = ti_count // ancestor_ti_count - map_index_start = ancestor_map_index * further_count - return range(map_index_start, map_index_start + further_count) - - -def find_relevant_relatives( - normal_tasks: Iterable[str], - mapped_tasks: Iterable[tuple[str, int]], - *, - direction: Literal["upstream", "downstream"], - dag: SerializedDAG, - run_id: str, - session: Session, -) -> Collection[str | tuple[str, int]]: - from airflow.models.mappedoperator import get_mapped_ti_count - - visited: set[str | tuple[str, int]] = set() - - def _visit_relevant_relatives_for_normal(task_ids: Iterable[str]) -> None: - partial_dag = dag.partial_subset( - task_ids=task_ids, - include_downstream=direction == "downstream", - include_upstream=direction == "upstream", - exclude_original=True, - ) - visited.update(partial_dag.task_dict) - - def _visit_relevant_relatives_for_mapped(mapped_tasks: Iterable[tuple[str, int]]) -> None: - for task_id, map_index in mapped_tasks: - task = dag.get_task(task_id) - ti_count = get_mapped_ti_count(task, run_id, session=session) - # TODO (GH-52141): This should return scheduler operator types, but - # currently get_flat_relatives is inherited from SDK DAGNode. - relatives = cast("Iterable[Operator]", task.get_flat_relatives(upstream=direction == "upstream")) - for relative in relatives: - if relative.task_id in visited: - continue - relative_map_indexes = _get_relevant_map_indexes( - task=task, - relative=relative, # type: ignore[arg-type] - run_id=run_id, - map_index=map_index, - ti_count=ti_count, - session=session, - ) - visiting_mapped: set[tuple[str, int]] = set() - visiting_normal: set[str] = set() - match relative_map_indexes: - case int(): - if (item := (relative.task_id, relative_map_indexes)) not in visited: - visiting_mapped.add(item) - case range(): - visiting_mapped.update((relative.task_id, i) for i in relative_map_indexes) - case None: - if (task_id := relative.task_id) not in visited: - visiting_normal.add(task_id) - _visit_relevant_relatives_for_normal(visiting_normal) - _visit_relevant_relatives_for_mapped(visiting_mapped) - visited.update(visiting_mapped, visiting_normal) - - _visit_relevant_relatives_for_normal(normal_tasks) - _visit_relevant_relatives_for_mapped(mapped_tasks) - return visited - - class TaskInstanceNote(Base): """For storage of arbitrary notes concerning the task instance.""" diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 41b81a3c65d58..6f743cd7d4bf6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2536,27 +2536,6 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint): 4, id="clear mapped tasks with and without map index", ), - pytest.param( - "example_task_group_mapping", - [ - { - "state": State.FAILED, - "map_indexes": (0, 1, 2), - }, - { - "state": State.FAILED, - "map_indexes": (0, 1, 2), - }, - ], - "example_task_group_mapping", - { - "task_ids": [["op.mul_2", 0]], - "dag_run_id": "TEST_DAG_RUN_ID", - "include_upstream": True, - }, - 2, - id="clear tasks in mapped task group", - ), ], ) def test_should_respond_200( diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 441d81de2b56b..4c7a0d9fd5c86 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -51,7 +51,6 @@ TaskInstance, TaskInstance as TI, TaskInstanceNote, - find_relevant_relatives, ) from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap @@ -3102,56 +3101,3 @@ def test_delete_dagversion_restricted_when_taskinstance_exists(dag_maker, sessio session.delete(version) with pytest.raises(IntegrityError): session.commit() - - -@pytest.mark.parametrize( - ("normal_tasks", "mapped_tasks", "expected"), - [ - # 4 is just a regular task so it depends on all its upstreams. - pytest.param(["4"], [], {"1", "2", "3"}, id="nonmapped"), - # 3 is a mapped; it depends on all tis of the mapped upstream 2. - pytest.param(["3"], [], {"1", "2"}, id="mapped-whole"), - # Every ti of a mapped task depends on all tis of the mapped upstream. - pytest.param([], [("3", 1)], {"1", "2"}, id="mapped-one"), - # Same as the (non-group) unmapped case, d depends on all upstreams. - pytest.param(["d"], [], {"a", "b", "c"}, id="group-nonmapped"), - # This specifies c tis in ALL mapped task groups, so all b tis are needed. - pytest.param(["c"], [], {"a", "b"}, id="group-mapped-whole"), - # This only specifies one c ti, so only one b ti from the same mapped instance is returned. - pytest.param([], [("c", 1)], {"a", ("b", 1)}, id="group-mapped-one"), - ], -) -def test_find_relevant_relatives(dag_maker, session, normal_tasks, mapped_tasks, expected): - # 1 -> 2[] -> 3[] -> 4 - # - # a -> " b --> c " -> d - # "== g[] ==" - with dag_maker(session=session) as dag: - t1 = EmptyOperator(task_id="1") - t2 = MockOperator.partial(task_id="2").expand(arg1=["x", "y"]) - t3 = MockOperator.partial(task_id="3").expand(arg1=["x", "y"]) - t4 = EmptyOperator(task_id="4") - t1 >> t2 >> t3 >> t4 - - ta = EmptyOperator(task_id="a") - - @task_group(prefix_group_id=False) - def g(v): - tb = MockOperator(task_id="b", arg1=v) - tc = MockOperator(task_id="c", arg1=v) - tb >> tc - - td = EmptyOperator(task_id="d") - ta >> g.expand(v=["x", "y", "z"]) >> td - - dr = dag_maker.create_dagrun(state="success") - - result = find_relevant_relatives( - normal_tasks=normal_tasks, - mapped_tasks=mapped_tasks, - direction="upstream", - dag=dag, - run_id=dr.run_id, - session=session, - ) - assert result == expected From 5f2ed48ee45307b2caaa38292ce367acfb0e2830 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 8 Dec 2025 10:27:56 +0100 Subject: [PATCH 3/6] [v3-1-test] Update release candidate commands (#59186) (#59187) Moving artifacts from the dist folder to svn was not working for both airflow and task sdk (cherry picked from commit 501f724b64f0d64736f1cffef7648eba485bb472) Co-authored-by: Ephraim Anierobi --- dev/README_RELEASE_AIRFLOW.md | 2 +- .../commands/release_candidate_command.py | 29 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/dev/README_RELEASE_AIRFLOW.md b/dev/README_RELEASE_AIRFLOW.md index 70bd555df1541..68bb1e2ff4dea 100644 --- a/dev/README_RELEASE_AIRFLOW.md +++ b/dev/README_RELEASE_AIRFLOW.md @@ -274,7 +274,7 @@ export GPG_TTY=$(tty) # Set Version export VERSION=3.1.3 export VERSION_SUFFIX=rc1 -export VERSION_RC=${VERSION}${VERSION_RC} +export VERSION_RC=${VERSION}${VERSION_SUFFIX} export VERSION_BRANCH=3-1 export TASK_SDK_VERSION=1.1.3 export TASK_SDK_VERSION_RC=${TASK_SDK_VERSION}${VERSION_SUFFIX} diff --git a/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py b/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py index 1f3da43717c53..e993209fe3bbf 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py +++ b/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py @@ -456,13 +456,18 @@ def clone_asf_repo(version, repo_root): console_print("[success]Cloned ASF repo successfully") -def move_artifacts_to_svn(version, task_sdk_version, repo_root): +def move_artifacts_to_svn( + version, version_without_rc, task_sdk_version, task_sdk_version_without_rc, repo_root +): if confirm_action("Do you want to move artifacts to SVN?"): os.chdir(f"{repo_root}/asf-dist/dev/airflow") run_command(["svn", "mkdir", f"{version}"], check=True) - run_command(f"mv {repo_root}/dist/*{version}* {version}/", check=True, shell=True) + run_command(f"mv {repo_root}/dist/*{version_without_rc}* {version}/", check=True, shell=True) + run_command(["svn", "mkdir", f"task-sdk/{task_sdk_version}"]) run_command( - f"mv {repo_root}/dist/*{task_sdk_version}* task-sdk/{task_sdk_version}/", check=True, shell=True + f"mv {repo_root}/dist/*{task_sdk_version_without_rc}* task-sdk/{task_sdk_version}/", + check=True, + shell=True, ) console_print("[success]Moved artifacts to SVN:") run_command(["ls"]) @@ -470,16 +475,22 @@ def move_artifacts_to_svn(version, task_sdk_version, repo_root): run_command([f"ls task-sdk/{task_sdk_version}"]) -def push_artifacts_to_asf_repo(version, repo_root): +def push_artifacts_to_asf_repo(version, task_sdk_version, repo_root): if confirm_action("Do you want to push artifacts to ASF repo?"): - console_print("Files to push to svn:") + console_print("Airflow Version Files to push to svn:") if not get_dry_run(): os.chdir(f"{repo_root}/asf-dist/dev/airflow/{version}") run_command(["ls"]) confirm_action("Do you want to continue?", abort=True) run_command("svn add *", check=True, shell=True) + console_print("Task SDK Version Files to push to svn:") + if not get_dry_run(): + os.chdir(f"{repo_root}/asf-dist/dev/airflow/task-sdk/{task_sdk_version}") + run_command(["ls"]) + confirm_action("Do you want to continue?", abort=True) + run_command("svn add *", check=True, shell=True) run_command( - ["svn", "commit", "-m", f"Add artifacts for Airflow {version}"], + ["svn", "commit", "-m", f"Add artifacts for Airflow {version} and Task SDK {task_sdk_version}"], check=True, ) console_print("[success]Files pushed to svn") @@ -738,9 +749,11 @@ def publish_release_candidate( # Clone the asf repo clone_asf_repo(version, airflow_repo_root) # Move artifacts to SVN - move_artifacts_to_svn(version, task_sdk_version, airflow_repo_root) + move_artifacts_to_svn( + version, version_without_rc, task_sdk_version, task_sdk_version_without_rc, airflow_repo_root + ) # Push the artifacts to the asf repo - push_artifacts_to_asf_repo(version, airflow_repo_root) + push_artifacts_to_asf_repo(version, task_sdk_version, airflow_repo_root) # Remove old releases remove_old_releases(version, airflow_repo_root) From 42984c5926cd04a2b81b86eea3ccefc3286ed5ab Mon Sep 17 00:00:00 2001 From: Vincent <97131062+vincbeck@users.noreply.github.com> Date: Mon, 20 Oct 2025 16:25:15 -0400 Subject: [PATCH 4/6] Handle invalid token in `JWTRefreshMiddleware` (#56904) (cherry picked from commit d0e6222ef8cabcd4c4add4baf9a0eab7172e5ada) --- .../api_fastapi/auth/middlewares/refresh_token.py | 7 +++++-- .../auth/middlewares/test_refresh_token.py | 14 +++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py index f304eb9517f65..81ed844873458 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py +++ b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from fastapi import Request +from fastapi import HTTPException, Request from starlette.middleware.base import BaseHTTPMiddleware from airflow.api_fastapi.app import get_auth_manager @@ -64,5 +64,8 @@ async def dispatch(self, request: Request, call_next): @staticmethod async def _refresh_user(current_token: str) -> BaseUser | None: - user = await resolve_user_from_token(current_token) + try: + user = await resolve_user_from_token(current_token) + except HTTPException: + return None return get_auth_manager().refresh_user(user=user) diff --git a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py index 87648a2be2b6c..e87b7c3fd2f15 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py +++ b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py @@ -20,7 +20,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from fastapi import Request, Response +from fastapi import HTTPException, Request, Response from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN from airflow.api_fastapi.auth.managers.models.base_user import BaseUser @@ -72,6 +72,18 @@ async def test_dispatch_no_refreshed_token( mock_resolve_user_from_token.assert_called_once_with("valid_token") mock_auth_manager.generate_jwt.assert_not_called() + @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") + @pytest.mark.asyncio + async def test_dispatch_expired_token(self, mock_resolve_user_from_token, middleware, mock_request): + mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "invalid_token"} + mock_resolve_user_from_token.side_effect = HTTPException(status_code=403) + + call_next = AsyncMock(return_value=Response()) + await middleware.dispatch(mock_request, call_next) + + call_next.assert_called_once_with(mock_request) + mock_resolve_user_from_token.assert_called_once_with("invalid_token") + @pytest.mark.asyncio @patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") From a67604716d7e933cfae65377e0dbdcbec4be05f2 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Tue, 21 Oct 2025 14:47:06 +0200 Subject: [PATCH 5/6] Fix refresh token middleware error handling (#56892) (cherry picked from commit d7174df9b813996a6e21c49707a626e51bdb98ce) --- .../auth/middlewares/refresh_token.py | 38 ++++++++++--------- .../auth/middlewares/test_refresh_token.py | 14 +++++++ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py index 81ed844873458..5705d14ba994a 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py +++ b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py @@ -18,6 +18,7 @@ from __future__ import annotations from fastapi import HTTPException, Request +from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from airflow.api_fastapi.app import get_auth_manager @@ -41,25 +42,28 @@ class JWTRefreshMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): new_user = None current_token = request.cookies.get(COOKIE_NAME_JWT_TOKEN) - if current_token: - new_user = await self._refresh_user(current_token) - if new_user: - request.state.user = new_user - - response = await call_next(request) + try: + if current_token: + new_user = await self._refresh_user(current_token) + if new_user: + request.state.user = new_user - if new_user: - # If we created a new user, serialize it and set it as a cookie - new_token = get_auth_manager().generate_jwt(new_user) - secure = bool(conf.get("api", "ssl_cert", fallback="")) - response.set_cookie( - COOKIE_NAME_JWT_TOKEN, - new_token, - httponly=True, - secure=secure, - samesite="lax", - ) + response = await call_next(request) + if new_user: + # If we created a new user, serialize it and set it as a cookie + new_token = get_auth_manager().generate_jwt(new_user) + secure = bool(conf.get("api", "ssl_cert", fallback="")) + response.set_cookie( + COOKIE_NAME_JWT_TOKEN, + new_token, + httponly=True, + secure=secure, + samesite="lax", + ) + except HTTPException as exc: + # If any HTTPException is raised during user resolution or refresh, return it as response + return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) return response @staticmethod diff --git a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py index e87b7c3fd2f15..834f864e40977 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py +++ b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py @@ -53,6 +53,20 @@ async def test_dispatch_no_token(self, mock_refresh_user, middleware, mock_reque call_next.assert_called_once_with(mock_request) mock_refresh_user.assert_not_called() + @patch.object( + JWTRefreshMiddleware, + "_refresh_user", + side_effect=HTTPException(status_code=403, detail="Invalid JWT token"), + ) + @pytest.mark.asyncio + async def test_dispatch_invalid_token(self, mock_refresh_user, middleware, mock_request): + mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "valid_token"} + call_next = AsyncMock(return_value=Response()) + + response = await middleware.dispatch(mock_request, call_next) + assert response.status_code == 403 + assert response.body == b'{"detail":"Invalid JWT token"}' + @patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") @pytest.mark.asyncio From 60f6b2ca98525c7f7493a6944c39eef8d58556e8 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 8 Dec 2025 17:42:55 +0100 Subject: [PATCH 6/6] Update RELEASE_NOTES.rst for rc2 --- RELEASE_NOTES.rst | 5 +++-- reproducible_build.yaml | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index f0bc530ba8570..9ba9e0c10d59d 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -24,7 +24,7 @@ .. towncrier release notes start -Airflow 3.1.4 (2025-12-09) +Airflow 3.1.4 (2025-12-10) -------------------------- Significant Changes @@ -35,7 +35,8 @@ No significant changes. Bug Fixes ^^^^^^^^^ -Fix task clearing to only find relevant upstream/downstream task instances (#58987) +Handle invalid token in JWTRefreshMiddleware (#56904) +Fix inconsistent Dag hashes when template fields contain unordered dicts (#59091) (#59175) Fix assets used only as inlets being incorrectly orphaned (#58986) Fix exception when logging stdout with a custom %-format string (#58963) Fix backfill max_active_runs race condition with concurrent schedulers (#58935) diff --git a/reproducible_build.yaml b/reproducible_build.yaml index 4bff1d7b12507..e415d73e96460 100644 --- a/reproducible_build.yaml +++ b/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: a61b70f4f9325bdd2db9fa8495474a39 -source-date-epoch: 1764766576 +release-notes-hash: 1cef4f069cb23b395098f97d070d8d0a +source-date-epoch: 1765212168