diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index f0bc530ba8570..9ba9e0c10d59d 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -24,7 +24,7 @@ .. towncrier release notes start -Airflow 3.1.4 (2025-12-09) +Airflow 3.1.4 (2025-12-10) -------------------------- Significant Changes @@ -35,7 +35,8 @@ No significant changes. Bug Fixes ^^^^^^^^^ -Fix task clearing to only find relevant upstream/downstream task instances (#58987) +Handle invalid token in JWTRefreshMiddleware (#56904) +Fix inconsistent Dag hashes when template fields contain unordered dicts (#59091) (#59175) Fix assets used only as inlets being incorrectly orphaned (#58986) Fix exception when logging stdout with a custom %-format string (#58963) Fix backfill max_active_runs race condition with concurrent schedulers (#58935) diff --git a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py index f304eb9517f65..5705d14ba994a 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py +++ b/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py @@ -17,7 +17,8 @@ # under the License. from __future__ import annotations -from fastapi import Request +from fastapi import HTTPException, Request +from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from airflow.api_fastapi.app import get_auth_manager @@ -41,28 +42,34 @@ class JWTRefreshMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): new_user = None current_token = request.cookies.get(COOKIE_NAME_JWT_TOKEN) - if current_token: - new_user = await self._refresh_user(current_token) - if new_user: - request.state.user = new_user - - response = await call_next(request) + try: + if current_token: + new_user = await self._refresh_user(current_token) + if new_user: + request.state.user = new_user - if new_user: - # If we created a new user, serialize it and set it as a cookie - new_token = get_auth_manager().generate_jwt(new_user) - secure = bool(conf.get("api", "ssl_cert", fallback="")) - response.set_cookie( - COOKIE_NAME_JWT_TOKEN, - new_token, - httponly=True, - secure=secure, - samesite="lax", - ) + response = await call_next(request) + if new_user: + # If we created a new user, serialize it and set it as a cookie + new_token = get_auth_manager().generate_jwt(new_user) + secure = bool(conf.get("api", "ssl_cert", fallback="")) + response.set_cookie( + COOKIE_NAME_JWT_TOKEN, + new_token, + httponly=True, + secure=secure, + samesite="lax", + ) + except HTTPException as exc: + # If any HTTPException is raised during user resolution or refresh, return it as response + return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) return response @staticmethod async def _refresh_user(current_token: str) -> BaseUser | None: - user = await resolve_user_from_token(current_token) + try: + user = await resolve_user_from_token(current_token) + except HTTPException: + return None return get_auth_manager().refresh_user(user=user) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 82e7fad3493bb..68764b5456aa6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -729,54 +729,32 @@ def post_clear_task_instances( if future: body.end_date = None - if (task_markers_to_clear := body.task_ids) is not None: - mapped_tasks_tuples = {t for t in task_markers_to_clear if isinstance(t, tuple)} + task_ids = body.task_ids + if task_ids is not None: + tasks = set(task_ids) + mapped_tasks_tuples = set(t for t in tasks if isinstance(t, tuple)) # Unmapped tasks are expressed in their task_ids (without map_indexes) - normal_task_ids = {t for t in task_markers_to_clear if not isinstance(t, tuple)} - - def _collect_relatives(run_id: str, direction: Literal["upstream", "downstream"]) -> None: - from airflow.models.taskinstance import find_relevant_relatives - - relevant_relatives = find_relevant_relatives( - normal_task_ids, - mapped_tasks_tuples, - dag=dag, - run_id=run_id, - direction=direction, - session=session, + unmapped_task_ids = set(t for t in tasks if not isinstance(t, tuple)) + + if upstream or downstream: + mapped_task_ids = set(tid for tid, _ in mapped_tasks_tuples) + relatives = dag.partial_subset( + task_ids=unmapped_task_ids | mapped_task_ids, + include_downstream=downstream, + include_upstream=upstream, + exclude_original=True, ) - normal_task_ids.update(t for t in relevant_relatives if not isinstance(t, tuple)) - mapped_tasks_tuples.update(t for t in relevant_relatives if isinstance(t, tuple)) - - # We can't easily calculate upstream/downstream map indexes when not - # working for a specific dag run. It's possible by looking at the runs - # one by one, but that is both resource-consuming and logically complex. - # So instead we'll just clear all the tis based on task ID and hope - # that's good enough for most cases. - if dag_run_id is None: - if upstream or downstream: - partial_dag = dag.partial_subset( - task_ids=normal_task_ids.union(tid for tid, _ in mapped_tasks_tuples), - include_downstream=downstream, - include_upstream=upstream, - exclude_original=True, - ) - normal_task_ids.update(partial_dag.task_dict) - else: - if upstream: - _collect_relatives(dag_run_id, "upstream") - if downstream: - _collect_relatives(dag_run_id, "downstream") - - task_markers_to_clear = [ - *normal_task_ids, - *((t, m) for t, m in mapped_tasks_tuples if t not in normal_task_ids), + unmapped_task_ids = unmapped_task_ids | set(relatives.task_dict.keys()) + + mapped_tasks_list = [ + (tid, map_id) for tid, map_id in mapped_tasks_tuples if tid not in unmapped_task_ids ] + task_ids = mapped_tasks_list + list(unmapped_task_ids) # Prepare common parameters common_params = { "dry_run": True, - "task_ids": task_markers_to_clear, + "task_ids": task_ids, "session": session, "run_on_latest_version": body.run_on_latest_version, "only_failed": body.only_failed, diff --git a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py index c7b3a02301daa..750c3da1ec17b 100644 --- a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py +++ b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py @@ -22,9 +22,9 @@ # [START example_dynamic_task_mapping] from datetime import datetime -from airflow.sdk import DAG, task, task_group +from airflow.sdk import DAG, task -with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)): +with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag: @task def add_one(x: int): @@ -39,11 +39,8 @@ def sum_it(values): sum_it(added_values) with DAG( - dag_id="example_task_mapping_second_order", - schedule=None, - catchup=False, - start_date=datetime(2022, 3, 4), -): + dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4) +) as dag2: @task def get_nums(): @@ -61,25 +58,4 @@ def add_10(num): _times_2 = times_2.expand(num=_get_nums) add_10.expand(num=_times_2) -with DAG( - dag_id="example_task_group_mapping", - schedule=None, - catchup=False, - start_date=datetime(2022, 3, 4), -): - - @task_group - def op(num): - @task - def add_1(num): - return num + 1 - - @task - def mul_2(num): - return num * 2 - - return mul_2(add_1(num)) - - op.expand(num=[1, 2, 3]) - # [END example_dynamic_task_mapping] diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 06597a24fc93d..216bc9b1bcad6 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -28,7 +28,7 @@ from collections.abc import Collection, Iterable from datetime import timedelta from functools import cache -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any from urllib.parse import quote import attrs @@ -121,7 +121,7 @@ from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.sdk.types import RuntimeTaskInstanceProtocol from airflow.serialization.definitions.taskgroup import SerializedTaskGroup - from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG + from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.context import Context Operator: TypeAlias = MappedOperator | SerializedBaseOperator @@ -2037,16 +2037,87 @@ def get_relevant_upstream_map_indexes( *, session: Session, ) -> int | range | None: + """ + Infer the map indexes of an upstream "relevant" to this ti. + + The bulk of the logic mainly exists to solve the problem described by + the following example, where 'val' must resolve to different values, + depending on where the reference is being used:: + + @task + def this_task(v): # This is self.task. + return v * 2 + + + @task_group + def tg1(inp): + val = upstream(inp) # This is the upstream task. + this_task(val) # When inp is 1, val here should resolve to 2. + return val + + + # This val is the same object returned by tg1. + val = tg1.expand(inp=[1, 2, 3]) + + + @task_group + def tg2(inp): + another_task(inp, val) # val here should resolve to [2, 4, 6]. + + + tg2.expand(inp=["a", "b"]) + + The surrounding mapped task groups of ``upstream`` and ``self.task`` are + inspected to find a common "ancestor". If such an ancestor is found, + we need to return specific map indexes to pull a partial value from + upstream XCom. + + :param upstream: The referenced upstream task. + :param ti_count: The total count of task instance this task was expanded + by the scheduler, i.e. ``expanded_ti_count`` in the template context. + :return: Specific map index or map indexes to pull, or ``None`` if we + want to "whole" return value (i.e. no mapped task groups involved). + """ + from airflow.models.mappedoperator import get_mapped_ti_count + if TYPE_CHECKING: - assert self.task - return _get_relevant_map_indexes( - run_id=self.run_id, - map_index=self.map_index, - ti_count=ti_count, - task=self.task, - relative=upstream, - session=session, - ) + assert self.task is not None + + # This value should never be None since we already know the current task + # is in a mapped task group, and should have been expanded, despite that, + # we need to check that it is not None to satisfy Mypy. + # But this value can be 0 when we expand an empty list, for that it is + # necessary to check that ti_count is not 0 to avoid dividing by 0. + if not ti_count: + return None + + # Find the innermost common mapped task group between the current task + # If the current task and the referenced task does not have a common + # mapped task group, the two are in different task mapping contexts + # (like another_task above), and we should use the "whole" value. + common_ancestor = _find_common_ancestor_mapped_group(self.task, upstream) + if common_ancestor is None: + return None + + # At this point we know the two tasks share a mapped task group, and we + # should use a "partial" value. Let's break down the mapped ti count + # between the ancestor and further expansion happened inside it. + + ancestor_ti_count = get_mapped_ti_count(common_ancestor, self.run_id, session=session) + ancestor_map_index = self.map_index * ancestor_ti_count // ti_count + + # If the task is NOT further expanded inside the common ancestor, we + # only want to reference one single ti. We must walk the actual DAG, + # and "ti_count == ancestor_ti_count" does not work, since the further + # expansion may be of length 1. + if not _is_further_mapped_inside(upstream, common_ancestor): + return ancestor_map_index + + # Otherwise we need a partial aggregation for values from selected task + # instances in the ancestor's expansion context. + further_count = ti_count // ancestor_ti_count + map_index_start = ancestor_map_index * further_count + return range(map_index_start, map_index_start + further_count) def clear_db_references(self, session: Session): """ @@ -2136,159 +2207,6 @@ def _is_further_mapped_inside(operator: Operator, container: SerializedTaskGroup return False -def _get_relevant_map_indexes( - *, - task: Operator, - run_id: str, - map_index: int, - relative: Operator, - ti_count: int | None, - session: Session, -) -> int | range | None: - """ - Infer the map indexes of a relative that's "relevant" to this ti. - - The bulk of the logic mainly exists to solve the problem described by - the following example, where 'val' must resolve to different values, - depending on where the reference is being used:: - - @task - def this_task(v): # This is self.task. - return v * 2 - - - @task_group - def tg1(inp): - val = upstream(inp) # This is the upstream task. - this_task(val) # When inp is 1, val here should resolve to 2. - return val - - - # This val is the same object returned by tg1. - val = tg1.expand(inp=[1, 2, 3]) - - - @task_group - def tg2(inp): - another_task(inp, val) # val here should resolve to [2, 4, 6]. - - - tg2.expand(inp=["a", "b"]) - - The surrounding mapped task groups of ``upstream`` and ``task`` are - inspected to find a common "ancestor". If such an ancestor is found, - we need to return specific map indexes to pull a partial value from - upstream XCom. - - The same logic apply for finding downstream tasks. - - :param task: Current task being inspected. - :param run_id: Current run ID. - :param map_index: Map index of the current task instance. - :param relative: The relative task to find relevant map indexes for. - :param ti_count: The total count of task instance this task was expanded - by the scheduler, i.e. ``expanded_ti_count`` in the template context. - :return: Specific map index or map indexes to pull, or ``None`` if we - want to "whole" return value (i.e. no mapped task groups involved). - """ - from airflow.models.mappedoperator import get_mapped_ti_count - - # This value should never be None since we already know the current task - # is in a mapped task group, and should have been expanded, despite that, - # we need to check that it is not None to satisfy Mypy. - # But this value can be 0 when we expand an empty list, for that it is - # necessary to check that ti_count is not 0 to avoid dividing by 0. - if not ti_count: - return None - - # Find the innermost common mapped task group between the current task - # If the current task and the referenced task does not have a common - # mapped task group, the two are in different task mapping contexts - # (like another_task above), and we should use the "whole" value. - if (common_ancestor := _find_common_ancestor_mapped_group(task, relative)) is None: - return None - - # At this point we know the two tasks share a mapped task group, and we - # should use a "partial" value. Let's break down the mapped ti count - # between the ancestor and further expansion happened inside it. - - ancestor_ti_count = get_mapped_ti_count(common_ancestor, run_id, session=session) - ancestor_map_index = map_index * ancestor_ti_count // ti_count - - # If the task is NOT further expanded inside the common ancestor, we - # only want to reference one single ti. We must walk the actual DAG, - # and "ti_count == ancestor_ti_count" does not work, since the further - # expansion may be of length 1. - if not _is_further_mapped_inside(relative, common_ancestor): - return ancestor_map_index - - # Otherwise we need a partial aggregation for values from selected task - # instances in the ancestor's expansion context. - further_count = ti_count // ancestor_ti_count - map_index_start = ancestor_map_index * further_count - return range(map_index_start, map_index_start + further_count) - - -def find_relevant_relatives( - normal_tasks: Iterable[str], - mapped_tasks: Iterable[tuple[str, int]], - *, - direction: Literal["upstream", "downstream"], - dag: SerializedDAG, - run_id: str, - session: Session, -) -> Collection[str | tuple[str, int]]: - from airflow.models.mappedoperator import get_mapped_ti_count - - visited: set[str | tuple[str, int]] = set() - - def _visit_relevant_relatives_for_normal(task_ids: Iterable[str]) -> None: - partial_dag = dag.partial_subset( - task_ids=task_ids, - include_downstream=direction == "downstream", - include_upstream=direction == "upstream", - exclude_original=True, - ) - visited.update(partial_dag.task_dict) - - def _visit_relevant_relatives_for_mapped(mapped_tasks: Iterable[tuple[str, int]]) -> None: - for task_id, map_index in mapped_tasks: - task = dag.get_task(task_id) - ti_count = get_mapped_ti_count(task, run_id, session=session) - # TODO (GH-52141): This should return scheduler operator types, but - # currently get_flat_relatives is inherited from SDK DAGNode. - relatives = cast("Iterable[Operator]", task.get_flat_relatives(upstream=direction == "upstream")) - for relative in relatives: - if relative.task_id in visited: - continue - relative_map_indexes = _get_relevant_map_indexes( - task=task, - relative=relative, # type: ignore[arg-type] - run_id=run_id, - map_index=map_index, - ti_count=ti_count, - session=session, - ) - visiting_mapped: set[tuple[str, int]] = set() - visiting_normal: set[str] = set() - match relative_map_indexes: - case int(): - if (item := (relative.task_id, relative_map_indexes)) not in visited: - visiting_mapped.add(item) - case range(): - visiting_mapped.update((relative.task_id, i) for i in relative_map_indexes) - case None: - if (task_id := relative.task_id) not in visited: - visiting_normal.add(task_id) - _visit_relevant_relatives_for_normal(visiting_normal) - _visit_relevant_relatives_for_mapped(visiting_mapped) - visited.update(visiting_mapped, visiting_normal) - - _visit_relevant_relatives_for_normal(normal_tasks) - _visit_relevant_relatives_for_mapped(mapped_tasks) - return visited - - class TaskInstanceNote(Base): """For storage of arbitrary notes concerning the task instance.""" diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 949b3cb9c9f09..5f564df6188ed 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -51,6 +51,16 @@ def translate_tuples_to_lists(obj: Any): return {key: translate_tuples_to_lists(value) for key, value in obj.items()} return obj + def sort_dict_recursively(obj: Any) -> Any: + """Recursively sort dictionaries to ensure consistent ordering.""" + if isinstance(obj, dict): + return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())} + if isinstance(obj, list): + return [sort_dict_recursively(item) for item in obj] + if isinstance(obj, tuple): + return tuple(sort_dict_recursively(item) for item in obj) + return obj + max_length = conf.getint("core", "max_templated_field_length") if not is_jsonable(template_field): @@ -70,6 +80,10 @@ def translate_tuples_to_lists(obj: Any): # and need to be converted to lists return template_field template_field = translate_tuples_to_lists(template_field) + # Sort dictionaries recursively to ensure consistent string representation + # This prevents hash inconsistencies when dict ordering varies + if isinstance(template_field, dict): + template_field = sort_dict_recursively(template_field) serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) diff --git a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py index 87648a2be2b6c..834f864e40977 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py +++ b/airflow-core/tests/unit/api_fastapi/auth/middlewares/test_refresh_token.py @@ -20,7 +20,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from fastapi import Request, Response +from fastapi import HTTPException, Request, Response from airflow.api_fastapi.auth.managers.base_auth_manager import COOKIE_NAME_JWT_TOKEN from airflow.api_fastapi.auth.managers.models.base_user import BaseUser @@ -53,6 +53,20 @@ async def test_dispatch_no_token(self, mock_refresh_user, middleware, mock_reque call_next.assert_called_once_with(mock_request) mock_refresh_user.assert_not_called() + @patch.object( + JWTRefreshMiddleware, + "_refresh_user", + side_effect=HTTPException(status_code=403, detail="Invalid JWT token"), + ) + @pytest.mark.asyncio + async def test_dispatch_invalid_token(self, mock_refresh_user, middleware, mock_request): + mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "valid_token"} + call_next = AsyncMock(return_value=Response()) + + response = await middleware.dispatch(mock_request, call_next) + assert response.status_code == 403 + assert response.body == b'{"detail":"Invalid JWT token"}' + @patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") @pytest.mark.asyncio @@ -72,6 +86,18 @@ async def test_dispatch_no_refreshed_token( mock_resolve_user_from_token.assert_called_once_with("valid_token") mock_auth_manager.generate_jwt.assert_not_called() + @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") + @pytest.mark.asyncio + async def test_dispatch_expired_token(self, mock_resolve_user_from_token, middleware, mock_request): + mock_request.cookies = {COOKIE_NAME_JWT_TOKEN: "invalid_token"} + mock_resolve_user_from_token.side_effect = HTTPException(status_code=403) + + call_next = AsyncMock(return_value=Response()) + await middleware.dispatch(mock_request, call_next) + + call_next.assert_called_once_with(mock_request) + mock_resolve_user_from_token.assert_called_once_with("invalid_token") + @pytest.mark.asyncio @patch("airflow.api_fastapi.auth.middlewares.refresh_token.get_auth_manager") @patch("airflow.api_fastapi.auth.middlewares.refresh_token.resolve_user_from_token") diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 41b81a3c65d58..6f743cd7d4bf6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2536,27 +2536,6 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint): 4, id="clear mapped tasks with and without map index", ), - pytest.param( - "example_task_group_mapping", - [ - { - "state": State.FAILED, - "map_indexes": (0, 1, 2), - }, - { - "state": State.FAILED, - "map_indexes": (0, 1, 2), - }, - ], - "example_task_group_mapping", - { - "task_ids": [["op.mul_2", 0]], - "dag_run_id": "TEST_DAG_RUN_ID", - "include_upstream": True, - }, - 2, - id="clear tasks in mapped task group", - ), ], ) def test_should_respond_200( diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 92446cee2189c..a81ecaf13fcc8 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -545,6 +545,59 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self): assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + def test_hash_method_consistent_with_dict_ordering_in_template_fields(self, dag_maker): + from airflow.sdk.bases.operator import BaseOperator + + class MyCustomOp(BaseOperator): + template_fields = ("env_vars",) + + def __init__(self, *, task_id: str, **kwargs): + super().__init__(task_id=task_id, **kwargs) + self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": "value3"} + + # Create first DAG with env_vars in one order + with dag_maker("test_dag") as dag1: + MyCustomOp(task_id="task1") + + serialized_dag_1 = SerializedDAG.to_dict(dag1) + + # Create second DAG with env_vars in different order + with dag_maker("test_dag") as dag2: + task = MyCustomOp(task_id="task1") + # Recreate dict with different insertion order + task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": "value2"} + + serialized_dag_2 = SerializedDAG.to_dict(dag2) + + # Verify that the original env_vars have different ordering + env_vars_1 = None + env_vars_2 = None + for task in serialized_dag_1["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_1 = task["__var"].get("env_vars") + for task in serialized_dag_2["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_2 = task["__var"].get("env_vars") + + assert env_vars_1 is not None, "serialized_dag_1 should have env_vars" + assert env_vars_2 is not None, "serialized_dag_2 should have env_vars" + # The serialized env_vars should be sorted dicts (or strings if truncated) + # If they're dicts, verify they're sorted; if strings, they should be equal due to sorting + if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict): + # Both should be sorted dictionaries with same content + assert list(env_vars_1.keys()) == sorted(env_vars_1.keys()) + assert list(env_vars_2.keys()) == sorted(env_vars_2.keys()) + assert env_vars_1 == env_vars_2, "Sorted dicts should be equal regardless of original order" + elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str): + # If truncated to strings, they should be equal due to sorting + assert env_vars_1 == env_vars_2, "String representations should be equal due to sorting" + + hash_1 = SDM.hash(serialized_dag_1) + hash_2 = SDM.hash(serialized_dag_2) + + # Hashes should be identical + assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" + def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): """ Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 441d81de2b56b..4c7a0d9fd5c86 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -51,7 +51,6 @@ TaskInstance, TaskInstance as TI, TaskInstanceNote, - find_relevant_relatives, ) from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap @@ -3102,56 +3101,3 @@ def test_delete_dagversion_restricted_when_taskinstance_exists(dag_maker, sessio session.delete(version) with pytest.raises(IntegrityError): session.commit() - - -@pytest.mark.parametrize( - ("normal_tasks", "mapped_tasks", "expected"), - [ - # 4 is just a regular task so it depends on all its upstreams. - pytest.param(["4"], [], {"1", "2", "3"}, id="nonmapped"), - # 3 is a mapped; it depends on all tis of the mapped upstream 2. - pytest.param(["3"], [], {"1", "2"}, id="mapped-whole"), - # Every ti of a mapped task depends on all tis of the mapped upstream. - pytest.param([], [("3", 1)], {"1", "2"}, id="mapped-one"), - # Same as the (non-group) unmapped case, d depends on all upstreams. - pytest.param(["d"], [], {"a", "b", "c"}, id="group-nonmapped"), - # This specifies c tis in ALL mapped task groups, so all b tis are needed. - pytest.param(["c"], [], {"a", "b"}, id="group-mapped-whole"), - # This only specifies one c ti, so only one b ti from the same mapped instance is returned. - pytest.param([], [("c", 1)], {"a", ("b", 1)}, id="group-mapped-one"), - ], -) -def test_find_relevant_relatives(dag_maker, session, normal_tasks, mapped_tasks, expected): - # 1 -> 2[] -> 3[] -> 4 - # - # a -> " b --> c " -> d - # "== g[] ==" - with dag_maker(session=session) as dag: - t1 = EmptyOperator(task_id="1") - t2 = MockOperator.partial(task_id="2").expand(arg1=["x", "y"]) - t3 = MockOperator.partial(task_id="3").expand(arg1=["x", "y"]) - t4 = EmptyOperator(task_id="4") - t1 >> t2 >> t3 >> t4 - - ta = EmptyOperator(task_id="a") - - @task_group(prefix_group_id=False) - def g(v): - tb = MockOperator(task_id="b", arg1=v) - tc = MockOperator(task_id="c", arg1=v) - tb >> tc - - td = EmptyOperator(task_id="d") - ta >> g.expand(v=["x", "y", "z"]) >> td - - dr = dag_maker.create_dagrun(state="success") - - result = find_relevant_relatives( - normal_tasks=normal_tasks, - mapped_tasks=mapped_tasks, - direction="upstream", - dag=dag, - run_id=dr.run_id, - session=session, - ) - assert result == expected diff --git a/dev/README_RELEASE_AIRFLOW.md b/dev/README_RELEASE_AIRFLOW.md index 70bd555df1541..68bb1e2ff4dea 100644 --- a/dev/README_RELEASE_AIRFLOW.md +++ b/dev/README_RELEASE_AIRFLOW.md @@ -274,7 +274,7 @@ export GPG_TTY=$(tty) # Set Version export VERSION=3.1.3 export VERSION_SUFFIX=rc1 -export VERSION_RC=${VERSION}${VERSION_RC} +export VERSION_RC=${VERSION}${VERSION_SUFFIX} export VERSION_BRANCH=3-1 export TASK_SDK_VERSION=1.1.3 export TASK_SDK_VERSION_RC=${TASK_SDK_VERSION}${VERSION_SUFFIX} diff --git a/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py b/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py index 1f3da43717c53..e993209fe3bbf 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py +++ b/dev/breeze/src/airflow_breeze/commands/release_candidate_command.py @@ -456,13 +456,18 @@ def clone_asf_repo(version, repo_root): console_print("[success]Cloned ASF repo successfully") -def move_artifacts_to_svn(version, task_sdk_version, repo_root): +def move_artifacts_to_svn( + version, version_without_rc, task_sdk_version, task_sdk_version_without_rc, repo_root +): if confirm_action("Do you want to move artifacts to SVN?"): os.chdir(f"{repo_root}/asf-dist/dev/airflow") run_command(["svn", "mkdir", f"{version}"], check=True) - run_command(f"mv {repo_root}/dist/*{version}* {version}/", check=True, shell=True) + run_command(f"mv {repo_root}/dist/*{version_without_rc}* {version}/", check=True, shell=True) + run_command(["svn", "mkdir", f"task-sdk/{task_sdk_version}"]) run_command( - f"mv {repo_root}/dist/*{task_sdk_version}* task-sdk/{task_sdk_version}/", check=True, shell=True + f"mv {repo_root}/dist/*{task_sdk_version_without_rc}* task-sdk/{task_sdk_version}/", + check=True, + shell=True, ) console_print("[success]Moved artifacts to SVN:") run_command(["ls"]) @@ -470,16 +475,22 @@ def move_artifacts_to_svn(version, task_sdk_version, repo_root): run_command([f"ls task-sdk/{task_sdk_version}"]) -def push_artifacts_to_asf_repo(version, repo_root): +def push_artifacts_to_asf_repo(version, task_sdk_version, repo_root): if confirm_action("Do you want to push artifacts to ASF repo?"): - console_print("Files to push to svn:") + console_print("Airflow Version Files to push to svn:") if not get_dry_run(): os.chdir(f"{repo_root}/asf-dist/dev/airflow/{version}") run_command(["ls"]) confirm_action("Do you want to continue?", abort=True) run_command("svn add *", check=True, shell=True) + console_print("Task SDK Version Files to push to svn:") + if not get_dry_run(): + os.chdir(f"{repo_root}/asf-dist/dev/airflow/task-sdk/{task_sdk_version}") + run_command(["ls"]) + confirm_action("Do you want to continue?", abort=True) + run_command("svn add *", check=True, shell=True) run_command( - ["svn", "commit", "-m", f"Add artifacts for Airflow {version}"], + ["svn", "commit", "-m", f"Add artifacts for Airflow {version} and Task SDK {task_sdk_version}"], check=True, ) console_print("[success]Files pushed to svn") @@ -738,9 +749,11 @@ def publish_release_candidate( # Clone the asf repo clone_asf_repo(version, airflow_repo_root) # Move artifacts to SVN - move_artifacts_to_svn(version, task_sdk_version, airflow_repo_root) + move_artifacts_to_svn( + version, version_without_rc, task_sdk_version, task_sdk_version_without_rc, airflow_repo_root + ) # Push the artifacts to the asf repo - push_artifacts_to_asf_repo(version, airflow_repo_root) + push_artifacts_to_asf_repo(version, task_sdk_version, airflow_repo_root) # Remove old releases remove_old_releases(version, airflow_repo_root) diff --git a/reproducible_build.yaml b/reproducible_build.yaml index 4bff1d7b12507..e415d73e96460 100644 --- a/reproducible_build.yaml +++ b/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: a61b70f4f9325bdd2db9fa8495474a39 -source-date-epoch: 1764766576 +release-notes-hash: 1cef4f069cb23b395098f97d070d8d0a +source-date-epoch: 1765212168