From 3309ca1c144044fea8fde5f79a6dadcaae6c5d00 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 14:16:47 +0530 Subject: [PATCH 01/30] Moving BaseOperatorLink to task sdk --- .pre-commit-config.yaml | 4 ++-- airflow/models/__init__.py | 3 +-- airflow/models/baseoperator.py | 2 +- airflow/serialization/serialized_objects.py | 2 +- docs/apache-airflow/howto/define-extra-link.rst | 6 +++--- .../src/airflow/providers/amazon/aws/links/base_aws.py | 3 ++- .../google/src/airflow/providers/google/cloud/links/base.py | 3 ++- .../src/airflow/providers/google/cloud/links/datafusion.py | 3 ++- .../src/airflow/providers/google/cloud/links/dataproc.py | 3 ++- .../airflow/providers/standard/operators/trigger_dagrun.py | 3 ++- .../src/airflow/providers/standard/sensors/external_task.py | 2 +- providers/yandex/src/airflow/providers/yandex/links/yq.py | 3 ++- .../airflow/sdk/definitions/_internal/abstractoperator.py | 2 +- .../src/airflow/sdk/definitions}/baseoperatorlink.py | 0 task_sdk/src/airflow/sdk/definitions/mappedoperator.py | 2 +- tests/cli/commands/local_commands/test_plugins_command.py | 2 +- tests/serialization/test_dag_serialization.py | 2 +- tests_common/test_utils/compat.py | 2 +- 18 files changed, 26 insertions(+), 21 deletions(-) rename {airflow/models => task_sdk/src/airflow/sdk/definitions}/baseoperatorlink.py (100%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ab73a9dfe8f0d..00568f9c32e00 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -716,8 +716,8 @@ repos: - id: check-base-operator-usage language: pygrep name: Check BaseOperatorLink core imports - description: Make sure BaseOperatorLink is imported from airflow.models.baseoperatorlink in core - entry: "from airflow\\.models import.* BaseOperatorLink" + description: Make sure BaseOperatorLink is imported from airflow.sdk.definitions.baseoperatorlink in core + entry: "from airflow\\.sdk\\.definitions\\.baseoperatorlink import.* BaseOperatorLink\\b" files: \.py$ pass_filenames: true exclude: > diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 6bd3883b139af..5cf1d1d83f693 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -88,7 +88,6 @@ def __getattr__(name): "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", - "BaseOperatorLink": "airflow.models.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", @@ -117,7 +116,6 @@ def __getattr__(name): from airflow.jobs.job import Job from airflow.models.base import ID_LEN, Base from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, DagTag from airflow.models.dagbag import DagBag @@ -137,4 +135,5 @@ def __getattr__(name): from airflow.models.trigger import Trigger from airflow.models.variable import Variable from airflow.models.xcom import XCom + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.param import Param diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index ea6a7c34b1149..9f3ca94559d57 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -93,9 +93,9 @@ from sqlalchemy.orm import Session from airflow.models.abstractoperator import TaskStateChangeCallback - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG as SchedulerDAG from airflow.models.operator import Operator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.node import DAGNode from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import BaseTrigger, StartTriggerArgs diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9b471d67b5cac..4c1a2bc6f405b 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -40,7 +40,6 @@ from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred from airflow.models.baseoperator import BaseOperator -from airflow.models.baseoperatorlink import BaseOperatorLink, XComOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval from airflow.models.expandinput import ( @@ -64,6 +63,7 @@ BaseAsset, ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink, XComOperatorLink from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup diff --git a/docs/apache-airflow/howto/define-extra-link.rst b/docs/apache-airflow/howto/define-extra-link.rst index 14864d1450036..e7458f24fa5d6 100644 --- a/docs/apache-airflow/howto/define-extra-link.rst +++ b/docs/apache-airflow/howto/define-extra-link.rst @@ -31,7 +31,7 @@ The following code shows how to add extra links to an operator via Plugins: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin @@ -85,7 +85,7 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator @@ -128,7 +128,7 @@ Console, but if we wanted to change that link we could: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin diff --git a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py index 83da4dd93cba9..eb3f210b8d1dd 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py @@ -19,8 +19,9 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom from airflow.providers.amazon.aws.utils.suppress import return_on_error +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py b/providers/google/src/airflow/providers/google/cloud/links/base.py index 5c4a39b78146f..07aaf569034e4 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/base.py +++ b/providers/google/src/airflow/providers/google/cloud/links/base.py @@ -19,7 +19,8 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index 4f3dd0904427b..92b63ef64f6f5 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -21,7 +21,8 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator diff --git a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py index 556b7c60a4bb1..cc08c8c01b1dd 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py @@ -25,8 +25,9 @@ import attr from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 2cd2143c0dc14..0491ad9d419bc 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -34,12 +34,13 @@ DagNotFound, DagRunAlreadyExists, ) -from airflow.models import BaseOperator, BaseOperatorLink +from airflow.models import BaseOperator from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.models.xcom import XCom from airflow.providers.standard.triggers.external_task import DagStateTrigger +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.utils import timezone from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index bedbf1c2ed07d..6e72d1e4ff3b4 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -25,12 +25,12 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sensors.base import BaseSensorOperator from airflow.utils.file import correct_maybe_zipped from airflow.utils.helpers import build_airflow_url_with_query diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 86babfec0f04a..92d098c18886f 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -18,7 +18,8 @@ from typing import TYPE_CHECKING -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator diff --git a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 481fb2ee773d7..507349abfdca4 100644 --- a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -41,8 +41,8 @@ if TYPE_CHECKING: import jinja2 - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.mappedoperator import MappedOperator diff --git a/airflow/models/baseoperatorlink.py b/task_sdk/src/airflow/sdk/definitions/baseoperatorlink.py similarity index 100% rename from airflow/models/baseoperatorlink.py rename to task_sdk/src/airflow/sdk/definitions/baseoperatorlink.py diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 92f83792f3c79..2608d4eefebf0 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -63,7 +63,6 @@ from airflow.models.abstractoperator import ( TaskStateChangeCallback, ) - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.expandinput import ( ExpandInput, OperatorExpandArgument, @@ -71,6 +70,7 @@ ) from airflow.models.xcom_arg import XComArg from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.types import Operator diff --git a/tests/cli/commands/local_commands/test_plugins_command.py b/tests/cli/commands/local_commands/test_plugins_command.py index af6a9ca5a27eb..7d5f09490662b 100644 --- a/tests/cli/commands/local_commands/test_plugins_command.py +++ b/tests/cli/commands/local_commands/test_plugins_command.py @@ -26,8 +26,8 @@ from airflow.cli import cli_parser from airflow.cli.commands.local_commands import plugins_command from airflow.listeners.listener import get_listener_manager -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.plugins_manager import AirflowPlugin +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from tests.plugins.test_plugin import AirflowTestPlugin as ComplexAirflowPlugin from tests_common.test_utils.mock_plugins import mock_plugin_manager diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 61d33bc450a47..7985b039019b6 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -58,7 +58,6 @@ ) from airflow.hooks.base import BaseHook from airflow.models.baseoperator import BaseOperator -from airflow.models.baseoperatorlink import XComOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagbag import DagBag @@ -69,6 +68,7 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink from airflow.sdk.definitions.param import Param, ParamsDict from airflow.security import permissions from airflow.serialization.enums import Encoding diff --git a/tests_common/test_utils/compat.py b/tests_common/test_utils/compat.py index 1ed69decbb4f1..318af6c8958f4 100644 --- a/tests_common/test_utils/compat.py +++ b/tests_common/test_utils/compat.py @@ -36,7 +36,7 @@ from airflow.models.errors import ImportError as ParseImportError # type: ignore[no-redef,attr-defined] try: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink except ImportError: # Compatibility for Airflow 2.7.* from airflow.models.baseoperator import BaseOperatorLink From c9a5c263f5a068081bcca754ece5b9602abb66ee Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 16:37:04 +0530 Subject: [PATCH 02/30] few more missed occurences --- .../src/airflow/providers/databricks/operators/databricks.py | 3 ++- .../providers/databricks/plugins/databricks_workflow.py | 3 ++- .../dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py | 3 ++- .../providers/google/cloud/operators/dataproc_metastore.py | 3 ++- .../google/marketing_platform/links/analytics_admin.py | 3 ++- .../providers/microsoft/azure/operators/data_factory.py | 3 ++- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 3 ++- .../src/airflow/providers/microsoft/azure/operators/synapse.py | 3 ++- 8 files changed, 16 insertions(+), 8 deletions(-) diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 4303e32cb562a..0bb8d27880596 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -29,7 +29,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, @@ -41,6 +41,7 @@ ) from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 81cf09b0f8d6b..7bef3ffbd7a4f 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -26,13 +26,13 @@ from flask_appbuilder.api import expose from airflow.exceptions import AirflowException, TaskInstanceNotFound -from airflow.models import BaseOperator, BaseOperatorLink from airflow.models.dag import DAG, clear_task_instances from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin from airflow.providers.databricks.hooks.databricks import DatabricksHook +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.utils.airflow_flask_app import AirflowApp from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -44,6 +44,7 @@ if TYPE_CHECKING: from sqlalchemy.orm.session import Session + from airflow.models import BaseOperator from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index d9aab8e9e2f67..6ce2a9352592f 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any from airflow.configuration import conf -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -33,6 +33,7 @@ ) from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.providers.openlineage.extractors import OperatorLineage diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py index 410b80fbbeeca..8245357115d8c 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -24,11 +24,11 @@ from typing import TYPE_CHECKING from airflow.exceptions import AirflowException -from airflow.models import BaseOperator, BaseOperatorLink from airflow.models.xcom import XCom from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.common.links.storage import StorageLink +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from google.api_core.exceptions import AlreadyExists from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.api_core.retry import Retry, exponential_sleep_generator @@ -37,6 +37,7 @@ from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore if TYPE_CHECKING: + from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context from google.protobuf.field_mask_pb2 import FieldMask diff --git a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py index 52d62851ed987..c432aa9b6d7ac 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -18,7 +18,8 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index d6313c68b2286..2e576c59d1d52 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -25,7 +25,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHook, AzureDataFactoryPipelineRunException, @@ -33,6 +33,7 @@ get_field, ) from airflow.providers.microsoft.azure.triggers.data_factory import AzureDataFactoryTrigger +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 1c6878c27af68..3797ee5af9706 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -21,9 +21,10 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator, BaseOperatorLink +from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from msgraph_core import APIVersion diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index 534d82446c221..6413ec45bf931 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.microsoft.azure.hooks.synapse import ( AzureSynapseHook, AzureSynapsePipelineHook, @@ -31,6 +31,7 @@ AzureSynapsePipelineRunStatus, AzureSynapseSparkBatchRunStatus, ) +from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey From 86647d499a92be2b9e34bdc75a921e7ac2d71816 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 16:45:59 +0530 Subject: [PATCH 03/30] adding lazy import --- .pre-commit-config.yaml | 2 +- docs/apache-airflow/howto/define-extra-link.rst | 2 +- task_sdk/src/airflow/sdk/__init__.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 00568f9c32e00..5af2e0270ee28 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -717,7 +717,7 @@ repos: language: pygrep name: Check BaseOperatorLink core imports description: Make sure BaseOperatorLink is imported from airflow.sdk.definitions.baseoperatorlink in core - entry: "from airflow\\.sdk\\.definitions\\.baseoperatorlink import.* BaseOperatorLink\\b" + entry: "from airflow\\.sdk\\..* import.* BaseOperatorLink\\b" files: \.py$ pass_filenames: true exclude: > diff --git a/docs/apache-airflow/howto/define-extra-link.rst b/docs/apache-airflow/howto/define-extra-link.rst index e7458f24fa5d6..476688778c5ff 100644 --- a/docs/apache-airflow/howto/define-extra-link.rst +++ b/docs/apache-airflow/howto/define-extra-link.rst @@ -31,7 +31,7 @@ The following code shows how to add extra links to an operator via Plugins: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin diff --git a/task_sdk/src/airflow/sdk/__init__.py b/task_sdk/src/airflow/sdk/__init__.py index 1fabbd2bd0308..7c174ee0747bb 100644 --- a/task_sdk/src/airflow/sdk/__init__.py +++ b/task_sdk/src/airflow/sdk/__init__.py @@ -52,6 +52,7 @@ __lazy_imports: dict[str, str] = { "BaseOperator": ".definitions.baseoperator", + "BaseOperatorLink": ".definitions.baseoperatorlink", "Connection": ".definitions.connection", "Param": ".definitions.param", "ParamsDict": ".definitions.param", From 9e87b8758a4611b7612c65ce9d8f5f94bd1d777f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 17:35:49 +0530 Subject: [PATCH 04/30] conditional import for providers --- .pre-commit-config.yaml | 7 ++-- .../providers/amazon/aws/links/base_aws.py | 7 +++- .../databricks/operators/databricks.py | 7 +++- .../databricks/plugins/databricks_workflow.py | 8 ++++- .../providers/databricks/version_compat.py | 36 +++++++++++++++++++ .../providers/dbt/cloud/operators/dbt.py | 8 ++++- .../airflow/providers/dbt/version_compat.py | 36 +++++++++++++++++++ .../providers/google/cloud/links/base.py | 7 +++- .../google/cloud/links/datafusion.py | 8 ++++- .../cloud/operators/dataproc_metastore.py | 9 ++++- .../links/analytics_admin.py | 9 ++++- .../microsoft/azure/operators/data_factory.py | 9 ++++- .../microsoft/azure/operators/powerbi.py | 9 ++++- .../microsoft/azure/operators/synapse.py | 9 ++++- .../providers/microsoft/version_compat.py | 36 +++++++++++++++++++ .../standard/operators/trigger_dagrun.py | 7 +++- .../standard/sensors/external_task.py | 9 ++++- .../src/airflow/providers/yandex/links/yq.py | 9 ++++- .../providers/yandex/version_compat.py | 36 +++++++++++++++++++ 19 files changed, 250 insertions(+), 16 deletions(-) create mode 100644 providers/databricks/src/airflow/providers/databricks/version_compat.py create mode 100644 providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py create mode 100644 providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py create mode 100644 providers/yandex/src/airflow/providers/yandex/version_compat.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5af2e0270ee28..6369f210e2292 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -731,8 +731,11 @@ repos: - id: check-base-operator-usage language: pygrep name: Check BaseOperator[Link] other imports - description: Make sure BaseOperator[Link] is imported from airflow.models outside of core - entry: "from airflow\\.models\\.baseoperator(link)? import.* BaseOperator" + description: Make sure BaseOperator is imported from airflow.models and BaseOperatorLink is imported from airflow.models or airflow.sdk.* outside of core + entry: > + (from airflow\\.models\\.baseoperator import.* BaseOperator\\b| + from airflow\\.models import.* BaseOperatorLink\\b| + from airflow\\.sdk\\..* import.* BaseOperatorLink\\b) pass_filenames: true files: > (?x) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py index eb3f210b8d1dd..772228af8a61a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py @@ -21,13 +21,18 @@ from airflow.models import XCom from airflow.providers.amazon.aws.utils.suppress import return_on_error -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + BASE_AWS_CONSOLE_LINK = "https://console.{aws_domain}" diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 0bb8d27880596..36b0fee98e82c 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -41,13 +41,18 @@ ) from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context from airflow.utils.task_group import TaskGroup +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + DEFER_METHOD_NAME = "execute_complete" XCOM_RUN_ID_KEY = "run_id" XCOM_JOB_ID_KEY = "job_id" diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 7bef3ffbd7a4f..547893511bb25 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -26,13 +26,14 @@ from flask_appbuilder.api import expose from airflow.exceptions import AirflowException, TaskInstanceNotFound +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG, clear_task_instances from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin from airflow.providers.databricks.hooks.databricks import DatabricksHook -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.airflow_flask_app import AirflowApp from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -47,6 +48,11 @@ from airflow.models import BaseOperator from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20) REPAIR_WAIT_DELAY = os.getenv("DATABRICKS_REPAIR_WAIT_DELAY", 0.5) diff --git a/providers/databricks/src/airflow/providers/databricks/version_compat.py b/providers/databricks/src/airflow/providers/databricks/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 6ce2a9352592f..0b7d7594491d1 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -25,6 +25,7 @@ from airflow.configuration import conf from airflow.models import BaseOperator, XCom +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -33,12 +34,17 @@ ) from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.dbt.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + class DbtCloudRunJobOperatorLink(BaseOperatorLink): """Allows users to monitor the triggered job run directly in dbt Cloud.""" diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py b/providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py b/providers/google/src/airflow/providers/google/cloud/links/base.py index 07aaf569034e4..9b106271be27a 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/base.py +++ b/providers/google/src/airflow/providers/google/cloud/links/base.py @@ -20,12 +20,17 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import XCom -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink BASE_LINK = "https://console.cloud.google.com" diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index 92b63ef64f6f5..7964359fdf587 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -22,13 +22,19 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import XCom -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink BASE_LINK = "https://console.cloud.google.com/data-fusion" DATAFUSION_INSTANCE_LINK = BASE_LINK + "/locations/{region}/instances/{instance_name}?project={project_id}" diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py index 8245357115d8c..f25d81f4dc90b 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -24,11 +24,11 @@ from typing import TYPE_CHECKING from airflow.exceptions import AirflowException +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.xcom import XCom from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.common.links.storage import StorageLink -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from google.api_core.exceptions import AlreadyExists from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.api_core.retry import Retry, exponential_sleep_generator @@ -42,6 +42,13 @@ from airflow.utils.context import Context from google.protobuf.field_mask_pb2 import FieldMask +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + BASE_LINK = "https://console.cloud.google.com" METASTORE_BASE_LINK = BASE_LINK + "/dataproc/metastore/services/{region}/{service_id}" diff --git a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py index c432aa9b6d7ac..ca4a3bad85803 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -19,12 +19,19 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import BaseOperator, XCom -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + BASE_LINK = "https://analytics.google.com/analytics/web/" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index 2e576c59d1d52..075ee3127fbd8 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -26,6 +26,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, XCom +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHook, AzureDataFactoryPipelineRunException, @@ -33,13 +34,19 @@ get_field, ) from airflow.providers.microsoft.azure.triggers.data_factory import AzureDataFactoryTrigger -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + class AzureDataFactoryPipelineRunLink(LoggingMixin, BaseOperatorLink): """Construct a link to monitor a pipeline run in Azure Data Factory.""" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 3797ee5af9706..c028644c8d432 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -22,9 +22,9 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from msgraph_core import APIVersion @@ -32,6 +32,13 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + class PowerBILink(BaseOperatorLink): """Construct a link to monitor a dataset in Power BI.""" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index 6413ec45bf931..eea99540be63a 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -24,6 +24,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, XCom +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.synapse import ( AzureSynapseHook, AzureSynapsePipelineHook, @@ -31,13 +32,19 @@ AzureSynapsePipelineRunStatus, AzureSynapseSparkBatchRunStatus, ) -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context from azure.synapse.spark.models import SparkBatchJobOptions +from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + class AzureSynapseRunSparkBatchOperator(BaseOperator): """ diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 0491ad9d419bc..000a5a3579da9 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -40,7 +40,7 @@ from airflow.models.dagrun import DagRun from airflow.models.xcom import XCom from airflow.providers.standard.triggers.external_task import DagStateTrigger -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session @@ -62,6 +62,11 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class TriggerDagRunLink(BaseOperatorLink): """ diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 6e72d1e4ff3b4..d1ed9ae536687 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -25,12 +25,12 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.providers.standard.utils.sensor_helper import _get_count, _get_external_task_group_task_ids -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sensors.base import BaseSensorOperator from airflow.utils.file import correct_maybe_zipped from airflow.utils.helpers import build_airflow_url_with_query @@ -49,6 +49,13 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context +from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + class ExternalDagLink(BaseOperatorLink): """ diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 92d098c18886f..dfb0239107c26 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING from airflow.models import XCom -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator @@ -31,6 +31,13 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context +from airflow.providers.yandex.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink + XCOM_WEBLINK_KEY = "web_link" diff --git a/providers/yandex/src/airflow/providers/yandex/version_compat.py b/providers/yandex/src/airflow/providers/yandex/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/yandex/src/airflow/providers/yandex/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) From 76a8ce351deba88078003f5a949168a9c9d9aca6 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 19:46:19 +0530 Subject: [PATCH 05/30] removing double occurences and fixing CI --- .pre-commit-config.yaml | 5 +++-- airflow/models/__init__.py | 3 ++- docs/apache-airflow/howto/define-extra-link.rst | 4 ++-- .../providers/databricks/plugins/databricks_workflow.py | 3 +-- .../cloud/src/airflow/providers/dbt/cloud/operators/dbt.py | 3 +-- .../google/src/airflow/providers/google/cloud/links/base.py | 3 +-- .../src/airflow/providers/google/cloud/links/datafusion.py | 3 +-- .../providers/google/cloud/operators/dataproc_metastore.py | 3 +-- .../google/marketing_platform/links/analytics_admin.py | 3 +-- .../providers/microsoft/azure/operators/data_factory.py | 3 +-- .../airflow/providers/microsoft/azure/operators/powerbi.py | 3 +-- .../airflow/providers/microsoft/azure/operators/synapse.py | 3 +-- .../src/airflow/providers/standard/sensors/external_task.py | 3 +-- providers/yandex/src/airflow/providers/yandex/links/yq.py | 3 +-- 14 files changed, 18 insertions(+), 27 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6369f210e2292..96bf5c317db68 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -716,10 +716,11 @@ repos: - id: check-base-operator-usage language: pygrep name: Check BaseOperatorLink core imports - description: Make sure BaseOperatorLink is imported from airflow.sdk.definitions.baseoperatorlink in core - entry: "from airflow\\.sdk\\..* import.* BaseOperatorLink\\b" + description: Make sure BaseOperatorLink is imported from airflow.sdk in core + entry: "^\\s*from airflow\\.sdk\\..* import.* BaseOperatorLink\\b" files: \.py$ pass_filenames: true + echo: True exclude: > (?x) ^airflow/decorators/.*$| diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 5cf1d1d83f693..822ebff2d9c6c 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -88,6 +88,7 @@ def __getattr__(name): "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", + "BaseOperatorLink": "airflow.models.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", @@ -135,5 +136,5 @@ def __getattr__(name): from airflow.models.trigger import Trigger from airflow.models.variable import Variable from airflow.models.xcom import XCom - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.param import Param diff --git a/docs/apache-airflow/howto/define-extra-link.rst b/docs/apache-airflow/howto/define-extra-link.rst index 476688778c5ff..de1cccdf73de0 100644 --- a/docs/apache-airflow/howto/define-extra-link.rst +++ b/docs/apache-airflow/howto/define-extra-link.rst @@ -85,7 +85,7 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator @@ -128,7 +128,7 @@ Console, but if we wanted to change that link we could: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 547893511bb25..e0958abab0107 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -26,7 +26,6 @@ from flask_appbuilder.api import expose from airflow.exceptions import AirflowException, TaskInstanceNotFound -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG, clear_task_instances from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKey @@ -51,7 +50,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 0b7d7594491d1..1ac2cb8c4c2a8 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -25,7 +25,6 @@ from airflow.configuration import conf from airflow.models import BaseOperator, XCom -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -43,7 +42,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class DbtCloudRunJobOperatorLink(BaseOperatorLink): diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py b/providers/google/src/airflow/providers/google/cloud/links/base.py index 9b106271be27a..77dd5649f35ce 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/base.py +++ b/providers/google/src/airflow/providers/google/cloud/links/base.py @@ -20,7 +20,6 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import XCom -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: @@ -30,7 +29,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] BASE_LINK = "https://console.cloud.google.com" diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index 7964359fdf587..f8522f044a5d5 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -22,7 +22,6 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import XCom -from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator @@ -34,7 +33,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] BASE_LINK = "https://console.cloud.google.com/data-fusion" DATAFUSION_INSTANCE_LINK = BASE_LINK + "/locations/{region}/instances/{instance_name}?project={project_id}" diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py index f25d81f4dc90b..9f330c1a8c2dd 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -24,7 +24,6 @@ from typing import TYPE_CHECKING from airflow.exceptions import AirflowException -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.xcom import XCom from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator @@ -47,7 +46,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] BASE_LINK = "https://console.cloud.google.com" diff --git a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py index ca4a3bad85803..222f666033c70 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -19,7 +19,6 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import BaseOperator, XCom -from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey @@ -30,7 +29,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] BASE_LINK = "https://analytics.google.com/analytics/web/" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index 075ee3127fbd8..1a2b1c30c6b69 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -26,7 +26,6 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, XCom -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHook, AzureDataFactoryPipelineRunException, @@ -45,7 +44,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class AzureDataFactoryPipelineRunLink(LoggingMixin, BaseOperatorLink): diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index c028644c8d432..5674e0aee59c8 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -22,7 +22,6 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger @@ -37,7 +36,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class PowerBILink(BaseOperatorLink): diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index eea99540be63a..6ad9927548154 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -24,7 +24,6 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator, XCom -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.providers.microsoft.azure.hooks.synapse import ( AzureSynapseHook, AzureSynapsePipelineHook, @@ -43,7 +42,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class AzureSynapseRunSparkBatchOperator(BaseOperator): diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index d1ed9ae536687..7b8675f792739 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -25,7 +25,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.providers.standard.operators.empty import EmptyOperator @@ -54,7 +53,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] class ExternalDagLink(BaseOperatorLink): diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index dfb0239107c26..88112e538b931 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -19,7 +19,6 @@ from typing import TYPE_CHECKING from airflow.models import XCom -from airflow.models.baseoperatorlink import BaseOperatorLink if TYPE_CHECKING: from airflow.models import BaseOperator @@ -36,7 +35,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink else: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] XCOM_WEBLINK_KEY = "web_link" From 8d21dc5796e174392597fd289f54db8e17aca692 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 19:55:59 +0530 Subject: [PATCH 06/30] adding newsfragments --- newsfragments/47008.significant.rst | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 newsfragments/47008.significant.rst diff --git a/newsfragments/47008.significant.rst b/newsfragments/47008.significant.rst new file mode 100644 index 0000000000000..6cff807d56243 --- /dev/null +++ b/newsfragments/47008.significant.rst @@ -0,0 +1,22 @@ +``BaseOperatorLink`` has now been moved into the task SDK to be consumed by DAG authors to write custom operator links. + +Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need to be updated to ``airflow.sdk.definitions.baseoperatorlink`` + +* Types of change + + * [x] Dag changes + * [ ] Config changes + * [ ] API changes + * [ ] CLI changes + * [ ] Behaviour changes + * [x] Plugin changes + * [ ] Dependency changes + * [ ] Code interface changes + +* Migration rules needed + + * ruff + + * AIR302 + + * [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk.definitions.baseoperatorlink`` From 5bcacb1380d011d02d151aba62b1d3b6266a4d67 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 25 Feb 2025 09:27:40 +0530 Subject: [PATCH 07/30] removing old ref from init --- airflow/models/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 822ebff2d9c6c..c0573dede3936 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -88,7 +88,6 @@ def __getattr__(name): "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", - "BaseOperatorLink": "airflow.models.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", From ca4a035a2dbfd22e8605b9e8ee692bd4ac72d778 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 25 Feb 2025 09:32:47 +0530 Subject: [PATCH 08/30] adding new ref to init --- airflow/models/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index c0573dede3936..89f93e1f2a9d5 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -88,6 +88,7 @@ def __getattr__(name): "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", + "BaseOperatorLink": "airflow.sdk.definitions.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", From 56c5a867be5629509de11a4749bd25f546b3d210 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 25 Feb 2025 11:12:19 +0530 Subject: [PATCH 09/30] fixing tests --- .../src/airflow/providers/google/cloud/links/dataproc.py | 7 ++++++- tests/always/test_project_structure.py | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py index cc08c8c01b1dd..8b9c67d82b4d7 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py @@ -27,13 +27,18 @@ from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import XCom from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + def __getattr__(name: str) -> Any: # PEP-562: deprecate module-level variable diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 34ec8b6af1f1c..f847277cfde41 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -82,6 +82,10 @@ def test_providers_modules_should_have_tests(self): "providers/celery/tests/unit/celery/executors/test_celery_executor_utils.py", "providers/celery/tests/unit/celery/executors/test_default_celery.py", "providers/celery/tests/unit/celery/test_version_compat.py", + "providers/databricks/tests/unit/databricks/test_version_compat.py", + "providers/dbt/tests/unit/dbt/test_version_compat.py", + "providers/microsoft/tests/unit/microsoft/test_version_compat.py", + "providers/yandex/tests/unit/yandex/test_version_compat.py", "providers/cloudant/tests/unit/cloudant/test_cloudant_fake.py", "providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor_types.py", "providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor_utils.py", From c8c916a103bece290a4fe2a2822223498e3ecffd Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 25 Feb 2025 14:21:47 +0530 Subject: [PATCH 10/30] fixing ut --- tests/always/test_project_structure.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index f847277cfde41..0bb020bd2a0d8 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -83,8 +83,8 @@ def test_providers_modules_should_have_tests(self): "providers/celery/tests/unit/celery/executors/test_default_celery.py", "providers/celery/tests/unit/celery/test_version_compat.py", "providers/databricks/tests/unit/databricks/test_version_compat.py", - "providers/dbt/tests/unit/dbt/test_version_compat.py", - "providers/microsoft/tests/unit/microsoft/test_version_compat.py", + "providers/dbt/cloud/tests/unit/dbt/test_version_compat.py", + "providers/microsoft/azure/tests/unit/microsoft/test_version_compat.py", "providers/yandex/tests/unit/yandex/test_version_compat.py", "providers/cloudant/tests/unit/cloudant/test_cloudant_fake.py", "providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor_types.py", From 41d11a5bc089b4a552d38e01368fd8c74bac0844 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 25 Feb 2025 14:55:18 +0530 Subject: [PATCH 11/30] fixing --- .../dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py | 2 +- .../src/airflow/providers/dbt/{ => cloud}/version_compat.py | 0 .../airflow/providers/microsoft/azure/operators/data_factory.py | 2 +- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 2 +- .../src/airflow/providers/microsoft/azure/operators/synapse.py | 2 +- .../airflow/providers/microsoft/{ => azure}/version_compat.py | 0 6 files changed, 4 insertions(+), 4 deletions(-) rename providers/dbt/cloud/src/airflow/providers/dbt/{ => cloud}/version_compat.py (100%) rename providers/microsoft/azure/src/airflow/providers/microsoft/{ => azure}/version_compat.py (100%) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 1ac2cb8c4c2a8..0e34fdbe730b4 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -33,7 +33,7 @@ ) from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run -from airflow.providers.dbt.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.providers.openlineage.extractors import OperatorLineage diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py similarity index 100% rename from providers/dbt/cloud/src/airflow/providers/dbt/version_compat.py rename to providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index 1a2b1c30c6b69..e918d2f783721 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -39,7 +39,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context -from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 5674e0aee59c8..1077a4de7ffdf 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -31,7 +31,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context -from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index 6ad9927548154..0cfa763e00e42 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -37,7 +37,7 @@ from airflow.utils.context import Context from azure.synapse.spark.models import SparkBatchJobOptions -from airflow.providers.microsoft.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py similarity index 100% rename from providers/microsoft/azure/src/airflow/providers/microsoft/version_compat.py rename to providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py From 4f1ace97d5087ed93c428f2d1e1103a6a5d899e5 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 15:14:29 +0530 Subject: [PATCH 12/30] import from common compat --- .../providers/dbt/cloud/operators/dbt.py | 2 +- .../providers/dbt/cloud/version_compat.py | 36 ------------------- .../src/airflow/providers/yandex/links/yq.py | 2 +- .../providers/yandex/version_compat.py | 36 ------------------- tests/always/test_project_structure.py | 4 --- 5 files changed, 2 insertions(+), 78 deletions(-) delete mode 100644 providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py delete mode 100644 providers/yandex/src/airflow/providers/yandex/version_compat.py diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 0e34fdbe730b4..4010c6ee34603 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -25,6 +25,7 @@ from airflow.configuration import conf from airflow.models import BaseOperator, XCom +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -33,7 +34,6 @@ ) from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run -from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.providers.openlineage.extractors import OperatorLineage diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py deleted file mode 100644 index 21e7170194e36..0000000000000 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY -# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS -# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT -# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE -# -from __future__ import annotations - - -def get_base_airflow_version_tuple() -> tuple[int, int, int]: - from packaging.version import Version - - from airflow import __version__ - - airflow_version = Version(__version__) - return airflow_version.major, airflow_version.minor, airflow_version.micro - - -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) -AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 88112e538b931..80d57b533ae59 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -30,7 +30,7 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context -from airflow.providers.yandex.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink diff --git a/providers/yandex/src/airflow/providers/yandex/version_compat.py b/providers/yandex/src/airflow/providers/yandex/version_compat.py deleted file mode 100644 index 21e7170194e36..0000000000000 --- a/providers/yandex/src/airflow/providers/yandex/version_compat.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY -# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS -# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT -# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE -# -from __future__ import annotations - - -def get_base_airflow_version_tuple() -> tuple[int, int, int]: - from packaging.version import Version - - from airflow import __version__ - - airflow_version = Version(__version__) - return airflow_version.major, airflow_version.minor, airflow_version.micro - - -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) -AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 597aa6a90ef03..548679757104d 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -83,10 +83,6 @@ def test_providers_modules_should_have_tests(self): "providers/celery/tests/unit/celery/executors/test_celery_executor_utils.py", "providers/celery/tests/unit/celery/executors/test_default_celery.py", "providers/celery/tests/unit/celery/test_version_compat.py", - "providers/databricks/tests/unit/databricks/test_version_compat.py", - "providers/dbt/cloud/tests/unit/dbt/test_version_compat.py", - "providers/microsoft/azure/tests/unit/microsoft/test_version_compat.py", - "providers/yandex/tests/unit/yandex/test_version_compat.py", "providers/cloudant/tests/unit/cloudant/test_cloudant_fake.py", "providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor_types.py", "providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor_utils.py", From 67b5584ab08f6f0e717e2fdc3586eea28f5f8706 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 17:43:38 +0530 Subject: [PATCH 13/30] generated provider_dependencies.json --- generated/provider_dependencies.json | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 2d1cbdb47f935..f669b8f0424f1 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -478,6 +478,7 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ + "common.compat", "http", "openlineage" ], @@ -1385,7 +1386,9 @@ ], "devel-deps": [], "plugins": [], - "cross-providers-deps": [], + "cross-providers-deps": [ + "common.compat" + ], "excluded-python-versions": [], "state": "ready" }, From 20a6fc5c7b31db50fcbcbe779386cd8a67c84974 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 18:56:50 +0530 Subject: [PATCH 14/30] fixing static checks --- airflow/serialization/serialized_objects.py | 2 +- providers/dbt/cloud/README.rst | 15 ++++++++------- providers/dbt/cloud/pyproject.toml | 3 +++ .../providers/dbt/cloud/get_provider_info.py | 5 ++++- providers/yandex/README.rst | 19 +++++++++++++++++++ providers/yandex/pyproject.toml | 7 +++++++ .../providers/yandex/get_provider_info.py | 1 + .../definitions/_internal/abstractoperator.py | 2 +- .../airflow/sdk/definitions/mappedoperator.py | 2 +- .../local_commands/test_plugins_command.py | 2 +- 10 files changed, 46 insertions(+), 12 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 4c1a2bc6f405b..b10ae0279cd48 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -50,6 +50,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg from airflow.providers_manager import ProvidersManager +from airflow.sdk import BaseOperatorLink, XComOperatorLink from airflow.sdk.definitions.asset import ( Asset, AssetAlias, @@ -63,7 +64,6 @@ BaseAsset, ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink, XComOperatorLink from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup diff --git a/providers/dbt/cloud/README.rst b/providers/dbt/cloud/README.rst index f264809b8a721..ab1cf3deaf071 100644 --- a/providers/dbt/cloud/README.rst +++ b/providers/dbt/cloud/README.rst @@ -69,15 +69,16 @@ You can install such cross-provider dependencies when installing from PyPI. For .. code-block:: bash - pip install apache-airflow-providers-dbt-cloud[http] + pip install apache-airflow-providers-dbt-cloud[common.compat] -============================================================================================================== =============== -Dependent package Extra -============================================================================================================== =============== -`apache-airflow-providers-http `_ ``http`` -`apache-airflow-providers-openlineage `_ ``openlineage`` -============================================================================================================== =============== +================================================================================================================== ================= +Dependent package Extra +================================================================================================================== ================= +`apache-airflow-providers-common-compat `_ ``common.compat`` +`apache-airflow-providers-http `_ ``http`` +`apache-airflow-providers-openlineage `_ ``openlineage`` +================================================================================================================== ================= The changelog for the provider package can be found in the `changelog `_. diff --git a/providers/dbt/cloud/pyproject.toml b/providers/dbt/cloud/pyproject.toml index 93c318288db11..3a6467d39f76b 100644 --- a/providers/dbt/cloud/pyproject.toml +++ b/providers/dbt/cloud/pyproject.toml @@ -68,6 +68,9 @@ dependencies = [ "openlineage" = [ "apache-airflow-providers-openlineage>=1.7.0", ] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] [project.urls] "Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/4.2.0" diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py index f944e6403a2b3..3a0b75df84ab0 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py @@ -98,5 +98,8 @@ def get_provider_info(): "asgiref>=2.3.0", "aiohttp>=3.9.2", ], - "optional-dependencies": {"openlineage": ["apache-airflow-providers-openlineage>=1.7.0"]}, + "optional-dependencies": { + "openlineage": ["apache-airflow-providers-openlineage>=1.7.0"], + "common.compat": ["apache-airflow-providers-common-compat"], + }, } diff --git a/providers/yandex/README.rst b/providers/yandex/README.rst index 8aa87f0a8b145..0df375d716604 100644 --- a/providers/yandex/README.rst +++ b/providers/yandex/README.rst @@ -60,5 +60,24 @@ PIP package Version required ``yandex-query-client`` ``>=0.1.4`` ======================= =============================================================== +Cross provider package dependencies +----------------------------------- + +Those are dependencies that might be needed in order to use all the features of the package. +You need to install the specified provider packages in order to use them. + +You can install such cross-provider dependencies when installing from PyPI. For example: + +.. code-block:: bash + + pip install apache-airflow-providers-yandex[common.compat] + + +================================================================================================================== ================= +Dependent package Extra +================================================================================================================== ================= +`apache-airflow-providers-common-compat `_ ``common.compat`` +================================================================================================================== ================= + The changelog for the provider package can be found in the `changelog `_. diff --git a/providers/yandex/pyproject.toml b/providers/yandex/pyproject.toml index 8cd5fd8a17f2d..2cc00aa0d65ca 100644 --- a/providers/yandex/pyproject.toml +++ b/providers/yandex/pyproject.toml @@ -64,6 +64,13 @@ dependencies = [ "yandex-query-client>=0.1.4", ] +# The optional dependencies should be modified in place in the generated file +# Any change in the dependencies is preserved when the file is regenerated +[project.optional-dependencies] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] + [project.urls] "Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-yandex/4.0.1" "Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-yandex/4.0.1/changelog.html" diff --git a/providers/yandex/src/airflow/providers/yandex/get_provider_info.py b/providers/yandex/src/airflow/providers/yandex/get_provider_info.py index bfcfd50705aff..e9c2a048a089d 100644 --- a/providers/yandex/src/airflow/providers/yandex/get_provider_info.py +++ b/providers/yandex/src/airflow/providers/yandex/get_provider_info.py @@ -123,4 +123,5 @@ def get_provider_info(): "yandexcloud>=0.308.0,!=0.329.0,!=0.330.0,!=0.331.0,!=0.332.0,!=0.333.0", "yandex-query-client>=0.1.4", ], + "optional-dependencies": {"common.compat": ["apache-airflow-providers-common-compat"]}, } diff --git a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 507349abfdca4..165af2e830737 100644 --- a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -41,8 +41,8 @@ if TYPE_CHECKING: import jinja2 + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.mappedoperator import MappedOperator diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 2608d4eefebf0..622c3c0ac7773 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -69,8 +69,8 @@ OperatorExpandKwargsArgument, ) from airflow.models.xcom_arg import XComArg + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.types import Operator diff --git a/tests/cli/commands/local_commands/test_plugins_command.py b/tests/cli/commands/local_commands/test_plugins_command.py index 7d5f09490662b..d780d5886320e 100644 --- a/tests/cli/commands/local_commands/test_plugins_command.py +++ b/tests/cli/commands/local_commands/test_plugins_command.py @@ -27,7 +27,7 @@ from airflow.cli.commands.local_commands import plugins_command from airflow.listeners.listener import get_listener_manager from airflow.plugins_manager import AirflowPlugin -from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink +from airflow.sdk import BaseOperatorLink from tests.plugins.test_plugin import AirflowTestPlugin as ComplexAirflowPlugin from tests_common.test_utils.mock_plugins import mock_plugin_manager From d229a5444d8abf67f5f4f3eb4f714bc23e314cc9 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 19:28:03 +0530 Subject: [PATCH 15/30] fixing static checks --- airflow/serialization/serialized_objects.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index b10ae0279cd48..b229e39f70d7a 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -50,7 +50,6 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg from airflow.providers_manager import ProvidersManager -from airflow.sdk import BaseOperatorLink, XComOperatorLink from airflow.sdk.definitions.asset import ( Asset, AssetAlias, @@ -64,6 +63,7 @@ BaseAsset, ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator +from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup @@ -98,6 +98,7 @@ from airflow.models import DagRun from airflow.models.expandinput import ExpandInput + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions._internal.node import DAGNode from airflow.sdk.types import Operator from airflow.serialization.json_schema import Validator From 6a6e5be2cec53bed0fbeb71484f1e3805b3f7a74 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 20:09:53 +0530 Subject: [PATCH 16/30] spelling ignore --- docs/spelling_wordlist.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 2f7a408bb1a47..127ac0c709233 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -171,6 +171,8 @@ BaseHook BaseObject BaseOperator baseOperator +BaseOperatorLink +baseoperatorlink basestring basetaskrunner BaseView From 448d27ab6423fcb9d0f6d07e5e5aaa38ed2c1a8a Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 21:05:11 +0530 Subject: [PATCH 17/30] anotehr try --- docs/apache-airflow/public-airflow-interface.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 23b5dffe4d33f..417e0a1e01493 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -275,7 +275,7 @@ they can be defined by the Operators, but plugins allow you to override the link :glob: :maxdepth: 1 - _api/airflow/models/baseoperatorlink/index + _api/airflow/sdk/definitions/baseoperatorlink/index You can read more about the Extra Links in :doc:`/howto/define-extra-link`. From 69ad97741daf6a6242fbb7755a0de63949d8644f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 28 Feb 2025 22:27:53 +0530 Subject: [PATCH 18/30] anotehr try --- airflow/models/baseoperatorlink.py | 24 +++++++++++++++++++ .../public-airflow-interface.rst | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 airflow/models/baseoperatorlink.py diff --git a/airflow/models/baseoperatorlink.py b/airflow/models/baseoperatorlink.py new file mode 100644 index 0000000000000..77dff46199633 --- /dev/null +++ b/airflow/models/baseoperatorlink.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Re exporting the new baseoperatorlink module from Task SDK for backward compatibility.""" + +from __future__ import annotations + +from airflow.sdk import BaseOperatorLink + +__all__ = ["BaseOperatorLink"] diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 417e0a1e01493..ab0fabfcac43b 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -275,7 +275,7 @@ they can be defined by the Operators, but plugins allow you to override the link :glob: :maxdepth: 1 - _api/airflow/sdk/definitions/baseoperatorlink/index + _api/airflow/models/baseoperatorlink/index You can read more about the Extra Links in :doc:`/howto/define-extra-link`. From ea7ebe3c175659947e42d9624a4446c8765f8a4f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 13:28:48 +0530 Subject: [PATCH 19/30] fixing imports --- airflow/models/baseoperatorlink.py | 24 ------------- .../databricks/operators/databricks.py | 2 +- .../databricks/plugins/databricks_workflow.py | 2 +- .../providers/databricks/version_compat.py | 36 ------------------- .../plugins/test_databricks_workflow.py | 2 +- .../microsoft/azure/version_compat.py | 36 ------------------- 6 files changed, 3 insertions(+), 99 deletions(-) delete mode 100644 airflow/models/baseoperatorlink.py delete mode 100644 providers/databricks/src/airflow/providers/databricks/version_compat.py delete mode 100644 providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py diff --git a/airflow/models/baseoperatorlink.py b/airflow/models/baseoperatorlink.py deleted file mode 100644 index 77dff46199633..0000000000000 --- a/airflow/models/baseoperatorlink.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Re exporting the new baseoperatorlink module from Task SDK for backward compatibility.""" - -from __future__ import annotations - -from airflow.sdk import BaseOperatorLink - -__all__ = ["BaseOperatorLink"] diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 36b0fee98e82c..4655cd40c1053 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -30,6 +30,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.models import BaseOperator, XCom +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, @@ -41,7 +42,6 @@ ) from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 03e4e4e8fe6a6..fff20ae773759 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -32,8 +32,8 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.databricks.hooks.databricks import DatabricksHook -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.airflow_flask_app import AirflowApp from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session diff --git a/providers/databricks/src/airflow/providers/databricks/version_compat.py b/providers/databricks/src/airflow/providers/databricks/version_compat.py deleted file mode 100644 index 21e7170194e36..0000000000000 --- a/providers/databricks/src/airflow/providers/databricks/version_compat.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY -# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS -# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT -# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE -# -from __future__ import annotations - - -def get_base_airflow_version_tuple() -> tuple[int, int, int]: - from packaging.version import Version - - from airflow import __version__ - - airflow_version = Version(__version__) - return airflow_version.major, airflow_version.minor, airflow_version.micro - - -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) -AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py index 10ecd97b3e6c4..c944317490970 100644 --- a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py +++ b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py @@ -21,7 +21,7 @@ import pytest -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: pytest.skip( diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py deleted file mode 100644 index 21e7170194e36..0000000000000 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY -# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS -# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT -# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE -# -from __future__ import annotations - - -def get_base_airflow_version_tuple() -> tuple[int, int, int]: - from packaging.version import Version - - from airflow import __version__ - - airflow_version = Version(__version__) - return airflow_version.major, airflow_version.minor, airflow_version.micro - - -AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) -AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) From 7d6253863432c98c1365e00f2652cb7af06e2d85 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 13:29:24 +0530 Subject: [PATCH 20/30] excluding docs for extra link --- docs/apache-airflow/public-airflow-interface.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index ab0fabfcac43b..08d5101283ebf 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -264,6 +264,9 @@ can be implemented to respond to DAG/Task lifecycle events. You can read more about Listeners in :doc:`administration-and-deployment/listeners`. +.. + TODO AIP-72: This class has been moved to task sdk but we cannot add a doc reference for it yet because task sdk doesn't have rendered docs yet. + Extra Links ----------- From 4045d09cabc8b6bfae7fee43c6af94b2f6695281 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 17:01:25 +0530 Subject: [PATCH 21/30] cleaning the mess --- .../databricks/operators/databricks.py | 2 +- .../databricks/plugins/databricks_workflow.py | 2 +- .../providers/databricks/version_compat.py | 36 +++++++++++++++++++ .../plugins/test_databricks_workflow.py | 2 +- .../microsoft/azure/version_compat.py | 36 +++++++++++++++++++ 5 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 providers/databricks/src/airflow/providers/databricks/version_compat.py create mode 100644 providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 4655cd40c1053..36b0fee98e82c 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -30,7 +30,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.models import BaseOperator, XCom -from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, @@ -42,6 +41,7 @@ ) from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index b822d3ea60a62..6441fa5cd41f6 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -33,8 +33,8 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin -from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.databricks.hooks.databricks import DatabricksHook +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState diff --git a/providers/databricks/src/airflow/providers/databricks/version_compat.py b/providers/databricks/src/airflow/providers/databricks/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py index 945a432cc7315..c41bd9690b1f5 100644 --- a/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py +++ b/providers/databricks/tests/unit/databricks/plugins/test_databricks_workflow.py @@ -21,7 +21,7 @@ import pytest -from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: pytest.skip( diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) From 72605cd3de453304ec0c8ef0733dfa8b5b0fb0bb Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 17:03:25 +0530 Subject: [PATCH 22/30] excluding docs for extra link --- docs/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/conf.py b/docs/conf.py index 6285efea0f56e..c9aa8b50bc76f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -213,6 +213,7 @@ # Included in the cluster-policies doc "_api/airflow/policies/index.rst", "README.rst", + "TODO", ] elif PACKAGE_NAME.startswith("apache-airflow-providers-"): extensions.extend( From 4b2d2a7a14d28c88519dcf150e7528ed6ec3cc1f Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 17:41:24 +0530 Subject: [PATCH 23/30] generated provider_dependencies.json --- generated/provider_dependencies.json | 1 - 1 file changed, 1 deletion(-) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index d0dc5b3f661be..fee4f374e0350 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -464,7 +464,6 @@ } ], "cross-providers-deps": [ - "common.compat", "common.sql" ], "excluded-python-versions": [], From 61204affd52978d259c4fc3f01ba99c1477ffef8 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 18:47:36 +0530 Subject: [PATCH 24/30] docs fix --- docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/conf.py b/docs/conf.py index c9aa8b50bc76f..db4b247b9e25e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -213,7 +213,7 @@ # Included in the cluster-policies doc "_api/airflow/policies/index.rst", "README.rst", - "TODO", + "_api/airflow/models/baseoperatorlink/index.rst", ] elif PACKAGE_NAME.startswith("apache-airflow-providers-"): extensions.extend( From 909d9555a037389ffa2b9d7dcc487a997dd31d1e Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 19:48:34 +0530 Subject: [PATCH 25/30] docs fix --- docs/conf.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index db4b247b9e25e..9f0a9686facfd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -213,7 +213,6 @@ # Included in the cluster-policies doc "_api/airflow/policies/index.rst", "README.rst", - "_api/airflow/models/baseoperatorlink/index.rst", ] elif PACKAGE_NAME.startswith("apache-airflow-providers-"): extensions.extend( @@ -278,7 +277,6 @@ def _get_rst_filepath_from_path(filepath: pathlib.Path): models_included: set[str] = { "baseoperator.py", - "baseoperatorlink.py", "connection.py", "dag.py", "dagrun.py", From 3532fc32b953e50e251524f3857338bb205f88bf Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 3 Mar 2025 23:59:02 +0530 Subject: [PATCH 26/30] docs fix --- docs/apache-airflow/public-airflow-interface.rst | 7 ------- 1 file changed, 7 deletions(-) diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 08d5101283ebf..d1012f5784ac5 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -273,13 +273,6 @@ Extra Links Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally they can be defined by the Operators, but plugins allow you to override the links on a global level. -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/baseoperatorlink/index - You can read more about the Extra Links in :doc:`/howto/define-extra-link`. Using Public Interface to integrate with external services and applications From aa2dda18a48596c2891dca8d13a33e5a05359215 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 4 Mar 2025 21:05:18 +0530 Subject: [PATCH 27/30] review from ash --- .pre-commit-config.yaml | 11 ++++------ contributing-docs/08_static_code_checks.rst | 2 +- newsfragments/47008.significant.rst | 22 ------------------- newsfragments/aip-72.significant.rst | 11 +++++++++- .../google/cloud/links/datafusion.py | 11 +++++----- task_sdk/src/airflow/sdk/__init__.py | 2 ++ .../airflow/sdk/definitions/mappedoperator.py | 2 +- 7 files changed, 23 insertions(+), 38 deletions(-) delete mode 100644 newsfragments/47008.significant.rst diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5759df5a6fb0f..168d02854267d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -695,7 +695,7 @@ repos: language: pygrep name: Check BaseOperatorLink core imports description: Make sure BaseOperatorLink is imported from airflow.sdk in core - entry: ".*from airflow\\.sdk\\..* import.* BaseOperatorLink\\b" + entry: "[ ]*from airflow\\.sdk import.* BaseOperatorLink\\b" files: \.py$ pass_filenames: true echo: True @@ -709,12 +709,9 @@ repos: ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep - name: Check BaseOperator[Link] other imports - description: Make sure BaseOperator is imported from airflow.models and BaseOperatorLink is imported from airflow.models or airflow.sdk.* outside of core - entry: > - (from airflow\\.models\\.baseoperator import.* BaseOperator\\b| - from airflow\\.models import.* BaseOperatorLink\\b| - from airflow\\.sdk\\..* import.* BaseOperatorLink\\b) + name: Check BaseOperator other imports + description: Make sure BaseOperator is imported from airflow.models outside of core + entry: "from airflow\\.models\\.baseoperator import.* BaseOperator" pass_filenames: true files: > (?x) diff --git a/contributing-docs/08_static_code_checks.rst b/contributing-docs/08_static_code_checks.rst index 8eccb20a304ff..e1dc99d7336ad 100644 --- a/contributing-docs/08_static_code_checks.rst +++ b/contributing-docs/08_static_code_checks.rst @@ -132,7 +132,7 @@ require Breeze Docker image to be built locally. +-----------------------------------------------------------+--------------------------------------------------------+---------+ | check-base-operator-usage | * Check BaseOperator core imports | | | | * Check BaseOperatorLink core imports | | -| | * Check BaseOperator[Link] other imports | | +| | * Check BaseOperator other imports | | +-----------------------------------------------------------+--------------------------------------------------------+---------+ | check-boring-cyborg-configuration | Checks for Boring Cyborg configuration consistency | | +-----------------------------------------------------------+--------------------------------------------------------+---------+ diff --git a/newsfragments/47008.significant.rst b/newsfragments/47008.significant.rst deleted file mode 100644 index 6cff807d56243..0000000000000 --- a/newsfragments/47008.significant.rst +++ /dev/null @@ -1,22 +0,0 @@ -``BaseOperatorLink`` has now been moved into the task SDK to be consumed by DAG authors to write custom operator links. - -Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need to be updated to ``airflow.sdk.definitions.baseoperatorlink`` - -* Types of change - - * [x] Dag changes - * [ ] Config changes - * [ ] API changes - * [ ] CLI changes - * [ ] Behaviour changes - * [x] Plugin changes - * [ ] Dependency changes - * [ ] Code interface changes - -* Migration rules needed - - * ruff - - * AIR302 - - * [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk.definitions.baseoperatorlink`` diff --git a/newsfragments/aip-72.significant.rst b/newsfragments/aip-72.significant.rst index ed95d3031471e..cf08e164d83fd 100644 --- a/newsfragments/aip-72.significant.rst +++ b/newsfragments/aip-72.significant.rst @@ -42,6 +42,9 @@ As part of this change the following breaking changes have occurred: It is recommended that you replace such a custom operator with a deferrable sensor, a condition or another triggering mechanism. +- ``BaseOperatorLink`` has now been moved into the task SDK to be consumed by DAG authors to write custom operator links. + + Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need to be updated to ``airflow.sdk.definitions.baseoperatorlink`` * Types of change @@ -50,7 +53,7 @@ As part of this change the following breaking changes have occurred: * [ ] API changes * [ ] CLI changes * [x] Behaviour changes - * [ ] Plugin changes + * [x] Plugin changes * [ ] Dependency changes * [ ] Code interface changes @@ -60,3 +63,9 @@ As part of this change the following breaking changes have occurred: * [x] ``core.task_runner`` * [x] ``core.enable_xcom_pickling`` + + * ruff + + * AIR302 + + * [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk`` diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index f8522f044a5d5..e3a814d23d39f 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -22,12 +22,6 @@ from typing import TYPE_CHECKING, ClassVar from airflow.models import XCom - -if TYPE_CHECKING: - from airflow.models import BaseOperator - from airflow.models.taskinstancekey import TaskInstanceKey - from airflow.utils.context import Context - from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: @@ -35,6 +29,11 @@ else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] +if TYPE_CHECKING: + from airflow.models import BaseOperator + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.utils.context import Context + BASE_LINK = "https://console.cloud.google.com/data-fusion" DATAFUSION_INSTANCE_LINK = BASE_LINK + "/locations/{region}/instances/{instance_name}?project={project_id}" DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/{namespace}/pipelines" diff --git a/task_sdk/src/airflow/sdk/__init__.py b/task_sdk/src/airflow/sdk/__init__.py index a851c9928769b..5f4ecfd1d9007 100644 --- a/task_sdk/src/airflow/sdk/__init__.py +++ b/task_sdk/src/airflow/sdk/__init__.py @@ -26,6 +26,7 @@ "AssetAny", "AssetWatcher", "BaseOperator", + "BaseOperatorLink", "Connection", "Context", "DAG", @@ -50,6 +51,7 @@ from airflow.sdk.definitions.assets.decorators import asset from airflow.sdk.definitions.assets.metadata import Metadata from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.connection import Connection from airflow.sdk.definitions.context import Context, get_current_context, get_parsing_context from airflow.sdk.definitions.dag import DAG, dag diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 622c3c0ac7773..2608d4eefebf0 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -69,8 +69,8 @@ OperatorExpandKwargsArgument, ) from airflow.models.xcom_arg import XComArg - from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.types import Operator From 9920d1710462461e61bff23b00f6200e5d5c71d3 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 4 Mar 2025 23:43:55 +0530 Subject: [PATCH 28/30] support spaces and tabs too --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 168d02854267d..7339d5058000b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -695,7 +695,7 @@ repos: language: pygrep name: Check BaseOperatorLink core imports description: Make sure BaseOperatorLink is imported from airflow.sdk in core - entry: "[ ]*from airflow\\.sdk import.* BaseOperatorLink\\b" + entry: "^\\s*from airflow\\.sdk import.* BaseOperatorLink\\b" files: \.py$ pass_filenames: true echo: True From 8789738c41c3875e148414a49f7c6d3f36666168 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 5 Mar 2025 11:24:28 +0530 Subject: [PATCH 29/30] fixing the precommit --- .pre-commit-config.yaml | 4 ++-- .../src/airflow/sdk/definitions/_internal/abstractoperator.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7339d5058000b..38a27df7f5363 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -694,8 +694,8 @@ repos: - id: check-base-operator-usage language: pygrep name: Check BaseOperatorLink core imports - description: Make sure BaseOperatorLink is imported from airflow.sdk in core - entry: "^\\s*from airflow\\.sdk import.* BaseOperatorLink\\b" + description: Make sure BaseOperatorLink is not imported from airflow.models in core + entry: "^\\s*from airflow\\.models\\.baseoperatorlink import BaseOperatorLink\\b" files: \.py$ pass_filenames: true echo: True diff --git a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 165af2e830737..507349abfdca4 100644 --- a/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -41,8 +41,8 @@ if TYPE_CHECKING: import jinja2 - from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.mappedoperator import MappedOperator From 4236711ecedbfab815996abd9929e90fe30d114b Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 6 Mar 2025 11:35:22 +0000 Subject: [PATCH 30/30] Apply suggestions from code review --- .../amazon/src/airflow/providers/amazon/aws/links/base_aws.py | 2 +- .../src/airflow/providers/databricks/operators/databricks.py | 2 +- .../airflow/providers/databricks/plugins/databricks_workflow.py | 2 +- .../dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py | 2 +- .../google/src/airflow/providers/google/cloud/links/base.py | 2 +- .../src/airflow/providers/google/cloud/links/datafusion.py | 2 +- .../google/src/airflow/providers/google/cloud/links/dataproc.py | 2 +- .../providers/google/cloud/operators/dataproc_metastore.py | 2 +- .../google/marketing_platform/links/analytics_admin.py | 2 +- .../airflow/providers/microsoft/azure/operators/data_factory.py | 2 +- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 2 +- .../src/airflow/providers/microsoft/azure/operators/synapse.py | 2 +- .../src/airflow/providers/standard/operators/trigger_dagrun.py | 2 +- .../src/airflow/providers/standard/sensors/external_task.py | 2 +- providers/yandex/src/airflow/providers/yandex/links/yq.py | 2 +- 15 files changed, 15 insertions(+), 15 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py index 772228af8a61a..47330c473c80b 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py @@ -29,7 +29,7 @@ from airflow.utils.context import Context if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 36b0fee98e82c..5888db6ad7cf9 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -49,7 +49,7 @@ from airflow.utils.task_group import TaskGroup if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 52f5fdf2c3383..aa7a2627c2286 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -52,7 +52,7 @@ from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 4010c6ee34603..5e6b92e64847e 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -40,7 +40,7 @@ from airflow.utils.context import Context if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py b/providers/google/src/airflow/providers/google/cloud/links/base.py index 77dd5649f35ce..49c3e09b29e10 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/base.py +++ b/providers/google/src/airflow/providers/google/cloud/links/base.py @@ -27,7 +27,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index e3a814d23d39f..117be86825c6b 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -25,7 +25,7 @@ from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py index 8b9c67d82b4d7..657ee717361a0 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py @@ -35,7 +35,7 @@ from airflow.utils.context import Context if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py index b6d805f0c55c4..64b3128560e41 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -47,7 +47,7 @@ from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py index 222f666033c70..cb7432dba90b9 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -27,7 +27,7 @@ from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index e918d2f783721..9f3fb298e8f13 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -42,7 +42,7 @@ from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 1077a4de7ffdf..cb2988dcfb462 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -34,7 +34,7 @@ from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index fc99cd09d0bab..d8052f66d4fc3 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -41,7 +41,7 @@ from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index cf0e2c7ebe672..8084f975c28e8 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -62,7 +62,7 @@ from airflow.utils.context import Context if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 17b21b04fa3ab..1142ca1ed6f62 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -50,7 +50,7 @@ if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 80d57b533ae59..72305fd6acaa9 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -33,7 +33,7 @@ from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink else: from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]