From 8a5f41cee7f26dce155ef5877c110312bddaa613 Mon Sep 17 00:00:00 2001 From: Shushank Ranjan Date: Sun, 14 Jun 2026 22:23:34 +0530 Subject: [PATCH] Bug Fixed --- .../src/airflow/sdk/definitions/variable.py | 15 ++---------- .../task_sdk/definitions/test_variables.py | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/variable.py b/task-sdk/src/airflow/sdk/definitions/variable.py index a379022a90918..e327b76547014 100644 --- a/task-sdk/src/airflow/sdk/definitions/variable.py +++ b/task-sdk/src/airflow/sdk/definitions/variable.py @@ -17,7 +17,6 @@ from __future__ import annotations -import logging from collections.abc import Sequence from typing import Any @@ -26,8 +25,6 @@ from airflow.sdk.definitions._internal.types import NOTSET from airflow.sdk.log import mask_secret -log = logging.getLogger(__name__) - @attrs.define class Variable: @@ -60,13 +57,9 @@ def get(cls, key: str, default: Any = NOTSET, deserialize_json: bool = False): @classmethod def set(cls, key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None: - from airflow.sdk.exceptions import AirflowRuntimeError from airflow.sdk.execution_time.context import _set_variable - try: - return _set_variable(key, value, description, serialize_json=serialize_json) - except AirflowRuntimeError as e: - log.exception(e) + _set_variable(key, value, description, serialize_json=serialize_json) @classmethod def keys(cls, prefix: str | None = None) -> Sequence[str]: @@ -94,10 +87,6 @@ def keys(cls, prefix: str | None = None) -> Sequence[str]: @classmethod def delete(cls, key: str) -> None: - from airflow.sdk.exceptions import AirflowRuntimeError from airflow.sdk.execution_time.context import _delete_variable - try: - _delete_variable(key=key) - except AirflowRuntimeError as e: - log.exception(e) + _delete_variable(key=key) diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 6e94ccf503f8c..23db1bbef4d96 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -89,6 +89,30 @@ def test_var_set(self, key, value, description, serialize_json, mock_supervisor_ ), ) + def test_var_set_raises_on_error(self, mock_supervisor_comms): + """Variable.set() must propagate AirflowRuntimeError so the task fails on a rejected write.""" + from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType + from airflow.sdk.execution_time.comms import ErrorResponse + + mock_supervisor_comms.send.side_effect = AirflowRuntimeError( + error=ErrorResponse(error=ErrorType.API_SERVER_ERROR, detail={"message": "forbidden"}) + ) + + with pytest.raises(AirflowRuntimeError): + Variable.set(key="forbidden_key", value="v") + + def test_var_delete_raises_on_error(self, mock_supervisor_comms): + """Variable.delete() must propagate AirflowRuntimeError so the task fails on a rejected delete.""" + from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType + from airflow.sdk.execution_time.comms import ErrorResponse + + mock_supervisor_comms.send.side_effect = AirflowRuntimeError( + error=ErrorResponse(error=ErrorType.API_SERVER_ERROR, detail={"message": "forbidden"}) + ) + + with pytest.raises(AirflowRuntimeError): + Variable.delete(key="forbidden_key") + class TestVariableKeys: @pytest.mark.parametrize(