diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6fd607e835ca5..2c33a1eb87d40 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -694,10 +694,11 @@ repos: - id: check-base-operator-usage language: pygrep name: Check BaseOperatorLink core imports - description: Make sure BaseOperatorLink is imported from airflow.models.baseoperatorlink in core - entry: "from airflow\\.models import.* BaseOperatorLink" + description: Make sure BaseOperatorLink is not imported from airflow.models in core + entry: "^\\s*from airflow\\.models\\.baseoperatorlink import BaseOperatorLink\\b" files: \.py$ pass_filenames: true + echo: True exclude: > (?x) ^airflow/decorators/.*$| @@ -708,9 +709,9 @@ repos: ^dev/provider_packages/.*$ - id: check-base-operator-usage language: pygrep - name: Check BaseOperator[Link] other imports - description: Make sure BaseOperator[Link] is imported from airflow.models outside of core - entry: "from airflow\\.models\\.baseoperator(link)? import.* BaseOperator" + name: Check BaseOperator other imports + description: Make sure BaseOperator is imported from airflow.models outside of core + entry: "from airflow\\.models\\.baseoperator import.* BaseOperator" pass_filenames: true files: > (?x) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 6bd3883b139af..89f93e1f2a9d5 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -88,7 +88,7 @@ def __getattr__(name): "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", - "BaseOperatorLink": "airflow.models.baseoperatorlink", + "BaseOperatorLink": "airflow.sdk.definitions.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", @@ -117,7 +117,6 @@ def __getattr__(name): from airflow.jobs.job import Job from airflow.models.base import ID_LEN, Base from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, DagTag from airflow.models.dagbag import DagBag @@ -137,4 +136,5 @@ def __getattr__(name): from airflow.models.trigger import Trigger from airflow.models.variable import Variable from airflow.models.xcom import XCom + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.param import Param diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index d1ed6d889ac6b..bc9333c48444f 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -88,9 +88,9 @@ from sqlalchemy.orm import Session from airflow.models.abstractoperator import TaskStateChangeCallback - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG as SchedulerDAG from airflow.models.operator import Operator + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions.node import DAGNode from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import StartTriggerArgs diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9b471d67b5cac..b229e39f70d7a 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -40,7 +40,6 @@ from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.exceptions import AirflowException, SerializationError, TaskDeferred from airflow.models.baseoperator import BaseOperator -from airflow.models.baseoperatorlink import BaseOperatorLink, XComOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG, _get_model_data_interval from airflow.models.expandinput import ( @@ -64,6 +63,7 @@ BaseAsset, ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator +from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.param import Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup @@ -98,6 +98,7 @@ from airflow.models import DagRun from airflow.models.expandinput import ExpandInput + from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions._internal.node import DAGNode from airflow.sdk.types import Operator from airflow.serialization.json_schema import Validator diff --git a/contributing-docs/08_static_code_checks.rst b/contributing-docs/08_static_code_checks.rst index bde70f0cef217..8ace7396cc636 100644 --- a/contributing-docs/08_static_code_checks.rst +++ b/contributing-docs/08_static_code_checks.rst @@ -132,7 +132,7 @@ require Breeze Docker image to be built locally. +-----------------------------------------------------------+--------------------------------------------------------+---------+ | check-base-operator-usage | * Check BaseOperator core imports | | | | * Check BaseOperatorLink core imports | | -| | * Check BaseOperator[Link] other imports | | +| | * Check BaseOperator other imports | | +-----------------------------------------------------------+--------------------------------------------------------+---------+ | check-boring-cyborg-configuration | Checks for Boring Cyborg configuration consistency | | +-----------------------------------------------------------+--------------------------------------------------------+---------+ diff --git a/devel-common/src/tests_common/test_utils/compat.py b/devel-common/src/tests_common/test_utils/compat.py index 7b231bdc73e72..e442981739cfa 100644 --- a/devel-common/src/tests_common/test_utils/compat.py +++ b/devel-common/src/tests_common/test_utils/compat.py @@ -36,7 +36,7 @@ from airflow.models.errors import ImportError as ParseImportError # type: ignore[no-redef,attr-defined] try: - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink except ImportError: # Compatibility for Airflow 2.7.* from airflow.models.baseoperator import BaseOperatorLink diff --git a/docs/apache-airflow/howto/define-extra-link.rst b/docs/apache-airflow/howto/define-extra-link.rst index 14864d1450036..de1cccdf73de0 100644 --- a/docs/apache-airflow/howto/define-extra-link.rst +++ b/docs/apache-airflow/howto/define-extra-link.rst @@ -31,7 +31,7 @@ The following code shows how to add extra links to an operator via Plugins: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin @@ -85,7 +85,7 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator @@ -128,7 +128,7 @@ Console, but if we wanted to change that link we could: .. code-block:: python from airflow.models.baseoperator import BaseOperator - from airflow.models.baseoperatorlink import BaseOperatorLink + from airflow.sdk import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 23b5dffe4d33f..d1012f5784ac5 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -264,19 +264,15 @@ can be implemented to respond to DAG/Task lifecycle events. You can read more about Listeners in :doc:`administration-and-deployment/listeners`. +.. + TODO AIP-72: This class has been moved to task sdk but we cannot add a doc reference for it yet because task sdk doesn't have rendered docs yet. + Extra Links ----------- Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally they can be defined by the Operators, but plugins allow you to override the links on a global level. -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/baseoperatorlink/index - You can read more about the Extra Links in :doc:`/howto/define-extra-link`. Using Public Interface to integrate with external services and applications diff --git a/docs/conf.py b/docs/conf.py index 6285efea0f56e..9f0a9686facfd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -277,7 +277,6 @@ def _get_rst_filepath_from_path(filepath: pathlib.Path): models_included: set[str] = { "baseoperator.py", - "baseoperatorlink.py", "connection.py", "dag.py", "dagrun.py", diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index ff16e3f01cafe..6e00332f5a166 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -171,6 +171,8 @@ BaseHook BaseObject BaseOperator baseOperator +BaseOperatorLink +baseoperatorlink basestring basetaskrunner BaseView diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 264fe73a06c59..15e6ba44b3423 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -494,6 +494,7 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ + "common.compat", "http", "openlineage" ], @@ -1404,7 +1405,9 @@ ], "devel-deps": [], "plugins": [], - "cross-providers-deps": [], + "cross-providers-deps": [ + "common.compat" + ], "excluded-python-versions": [], "state": "ready" }, diff --git a/newsfragments/aip-72.significant.rst b/newsfragments/aip-72.significant.rst index ed95d3031471e..cf08e164d83fd 100644 --- a/newsfragments/aip-72.significant.rst +++ b/newsfragments/aip-72.significant.rst @@ -42,6 +42,9 @@ As part of this change the following breaking changes have occurred: It is recommended that you replace such a custom operator with a deferrable sensor, a condition or another triggering mechanism. +- ``BaseOperatorLink`` has now been moved into the task SDK to be consumed by DAG authors to write custom operator links. + + Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need to be updated to ``airflow.sdk.definitions.baseoperatorlink`` * Types of change @@ -50,7 +53,7 @@ As part of this change the following breaking changes have occurred: * [ ] API changes * [ ] CLI changes * [x] Behaviour changes - * [ ] Plugin changes + * [x] Plugin changes * [ ] Dependency changes * [ ] Code interface changes @@ -60,3 +63,9 @@ As part of this change the following breaking changes have occurred: * [x] ``core.task_runner`` * [x] ``core.enable_xcom_pickling`` + + * ruff + + * AIR302 + + * [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk`` diff --git a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py index 83da4dd93cba9..47330c473c80b 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py @@ -19,14 +19,20 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom from airflow.providers.amazon.aws.utils.suppress import return_on_error +from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + BASE_AWS_CONSOLE_LINK = "https://console.{aws_domain}" diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 4303e32cb562a..5888db6ad7cf9 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -29,7 +29,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, @@ -41,12 +41,18 @@ ) from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context from airflow.utils.task_group import TaskGroup +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + DEFER_METHOD_NAME = "execute_complete" XCOM_RUN_ID_KEY = "run_id" XCOM_JOB_ID_KEY = "job_id" diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index 05da1ed5609da..aa7a2627c2286 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -27,7 +27,7 @@ from flask_appbuilder.api import expose from airflow.exceptions import AirflowException, TaskInstanceNotFound -from airflow.models import BaseOperator, BaseOperatorLink, DagBag +from airflow.models import DagBag from airflow.models.dag import DAG, clear_task_instances from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKey @@ -48,8 +48,14 @@ if TYPE_CHECKING: from sqlalchemy.orm.session import Session + from airflow.models import BaseOperator from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20) REPAIR_WAIT_DELAY = os.getenv("DATABRICKS_REPAIR_WAIT_DELAY", 0.5) diff --git a/providers/dbt/cloud/README.rst b/providers/dbt/cloud/README.rst index f264809b8a721..ab1cf3deaf071 100644 --- a/providers/dbt/cloud/README.rst +++ b/providers/dbt/cloud/README.rst @@ -69,15 +69,16 @@ You can install such cross-provider dependencies when installing from PyPI. For .. code-block:: bash - pip install apache-airflow-providers-dbt-cloud[http] + pip install apache-airflow-providers-dbt-cloud[common.compat] -============================================================================================================== =============== -Dependent package Extra -============================================================================================================== =============== -`apache-airflow-providers-http `_ ``http`` -`apache-airflow-providers-openlineage `_ ``openlineage`` -============================================================================================================== =============== +================================================================================================================== ================= +Dependent package Extra +================================================================================================================== ================= +`apache-airflow-providers-common-compat `_ ``common.compat`` +`apache-airflow-providers-http `_ ``http`` +`apache-airflow-providers-openlineage `_ ``openlineage`` +================================================================================================================== ================= The changelog for the provider package can be found in the `changelog `_. diff --git a/providers/dbt/cloud/pyproject.toml b/providers/dbt/cloud/pyproject.toml index 112da22627ec6..6e18e7a75f62f 100644 --- a/providers/dbt/cloud/pyproject.toml +++ b/providers/dbt/cloud/pyproject.toml @@ -70,12 +70,16 @@ dependencies = [ "openlineage" = [ "apache-airflow-providers-openlineage>=1.7.0", ] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] [dependency-groups] dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", + "apache-airflow-providers-common-compat", "apache-airflow-providers-http", "apache-airflow-providers-openlineage", # Additional devel dependencies (do not remove this line and add extra development dependencies) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py index 2adda4d341fb4..7d9f5a1d2a5b7 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/get_provider_info.py @@ -98,6 +98,9 @@ def get_provider_info(): "asgiref>=2.3.0", "aiohttp>=3.9.2", ], - "optional-dependencies": {"openlineage": ["apache-airflow-providers-openlineage>=1.7.0"]}, + "optional-dependencies": { + "openlineage": ["apache-airflow-providers-openlineage>=1.7.0"], + "common.compat": ["apache-airflow-providers-common-compat"], + }, "devel-dependencies": [], } diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index d9aab8e9e2f67..5e6b92e64847e 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -24,7 +24,8 @@ from typing import TYPE_CHECKING, Any from airflow.configuration import conf -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -38,6 +39,11 @@ from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class DbtCloudRunJobOperatorLink(BaseOperatorLink): """Allows users to monitor the triggered job run directly in dbt Cloud.""" diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py b/providers/google/src/airflow/providers/google/cloud/links/base.py index 5c4a39b78146f..49c3e09b29e10 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/base.py +++ b/providers/google/src/airflow/providers/google/cloud/links/base.py @@ -19,12 +19,17 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] BASE_LINK = "https://console.cloud.google.com" diff --git a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py index 4f3dd0904427b..117be86825c6b 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/links/datafusion.py @@ -21,14 +21,19 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context - BASE_LINK = "https://console.cloud.google.com/data-fusion" DATAFUSION_INSTANCE_LINK = BASE_LINK + "/locations/{region}/instances/{instance_name}?project={project_id}" DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/{namespace}/pipelines" diff --git a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py index 556b7c60a4bb1..657ee717361a0 100644 --- a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py @@ -25,14 +25,20 @@ import attr from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + def __getattr__(name: str) -> Any: # PEP-562: deprecate module-level variable diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py index a9470b17e3e3c..64b3128560e41 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -40,9 +40,17 @@ if TYPE_CHECKING: from google.protobuf.field_mask_pb2 import FieldMask + from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + BASE_LINK = "https://console.cloud.google.com" METASTORE_BASE_LINK = BASE_LINK + "/dataproc/metastore/services/{region}/{service_id}" diff --git a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py index 52d62851ed987..cb7432dba90b9 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py @@ -18,12 +18,19 @@ from typing import TYPE_CHECKING, ClassVar -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + BASE_LINK = "https://analytics.google.com/analytics/web/" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index d6313c68b2286..9f3fb298e8f13 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -25,7 +25,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHook, AzureDataFactoryPipelineRunException, @@ -39,6 +39,13 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class AzureDataFactoryPipelineRunLink(LoggingMixin, BaseOperatorLink): """Construct a link to monitor a pipeline run in Azure Data Factory.""" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 1c6878c27af68..cb2988dcfb462 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator, BaseOperatorLink +from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger @@ -31,6 +31,13 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class PowerBILink(BaseOperatorLink): """Construct a link to monitor a dataset in Power BI.""" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index 275a45b30f455..d8052f66d4fc3 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models import BaseOperator, XCom from airflow.providers.microsoft.azure.hooks.synapse import ( AzureSynapseHook, AzureSynapsePipelineHook, @@ -38,6 +38,13 @@ from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class AzureSynapseRunSparkBatchOperator(BaseOperator): """ diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 14c51a0579097..8084f975c28e8 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -34,7 +34,7 @@ DagNotFound, DagRunAlreadyExists, ) -from airflow.models import BaseOperator, BaseOperatorLink +from airflow.models import BaseOperator from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun @@ -61,6 +61,11 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + class TriggerDagRunLink(BaseOperatorLink): """ diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 68e8c1a15b281..1142ca1ed6f62 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -25,7 +25,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.providers.standard.operators.empty import EmptyOperator @@ -50,6 +49,12 @@ from airflow.utils.context import Context +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + + class ExternalDagLink(BaseOperatorLink): """ Operator link for ExternalTaskSensor and ExternalTaskMarker. diff --git a/providers/yandex/README.rst b/providers/yandex/README.rst index 30a7c3522ad74..e017cf931848b 100644 --- a/providers/yandex/README.rst +++ b/providers/yandex/README.rst @@ -60,5 +60,24 @@ PIP package Version required ``yandex-query-client`` ``>=0.1.4`` ======================= ================== +Cross provider package dependencies +----------------------------------- + +Those are dependencies that might be needed in order to use all the features of the package. +You need to install the specified provider packages in order to use them. + +You can install such cross-provider dependencies when installing from PyPI. For example: + +.. code-block:: bash + + pip install apache-airflow-providers-yandex[common.compat] + + +================================================================================================================== ================= +Dependent package Extra +================================================================================================================== ================= +`apache-airflow-providers-common-compat `_ ``common.compat`` +================================================================================================================== ================= + The changelog for the provider package can be found in the `changelog `_. diff --git a/providers/yandex/pyproject.toml b/providers/yandex/pyproject.toml index 68c2e95796304..6991ed2efeb5e 100644 --- a/providers/yandex/pyproject.toml +++ b/providers/yandex/pyproject.toml @@ -62,11 +62,19 @@ dependencies = [ "yandex-query-client>=0.1.4", ] +# The optional dependencies should be modified in place in the generated file +# Any change in the dependencies is preserved when the file is regenerated +[project.optional-dependencies] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] + [dependency-groups] dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", + "apache-airflow-providers-common-compat", # Additional devel dependencies (do not remove this line and add extra development dependencies) ] diff --git a/providers/yandex/src/airflow/providers/yandex/get_provider_info.py b/providers/yandex/src/airflow/providers/yandex/get_provider_info.py index 0fe1332f4dda1..23d6cb93469df 100644 --- a/providers/yandex/src/airflow/providers/yandex/get_provider_info.py +++ b/providers/yandex/src/airflow/providers/yandex/get_provider_info.py @@ -119,5 +119,6 @@ def get_provider_info(): } }, "dependencies": ["apache-airflow>=2.9.0", "yandexcloud>=0.308.0", "yandex-query-client>=0.1.4"], + "optional-dependencies": {"common.compat": ["apache-airflow-providers-common-compat"]}, "devel-dependencies": [], } diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 86babfec0f04a..72305fd6acaa9 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING -from airflow.models import BaseOperatorLink, XCom +from airflow.models import XCom if TYPE_CHECKING: from airflow.models import BaseOperator @@ -30,6 +30,13 @@ # TODO: Remove once provider drops support for Airflow 2 from airflow.utils.context import Context +from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink +else: + from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] + XCOM_WEBLINK_KEY = "web_link" diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index 65131790c7c3d..5f4ecfd1d9007 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -26,6 +26,7 @@ "AssetAny", "AssetWatcher", "BaseOperator", + "BaseOperatorLink", "Connection", "Context", "DAG", @@ -50,6 +51,7 @@ from airflow.sdk.definitions.assets.decorators import asset from airflow.sdk.definitions.assets.metadata import Metadata from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.connection import Connection from airflow.sdk.definitions.context import Context, get_current_context, get_parsing_context from airflow.sdk.definitions.dag import DAG, dag @@ -67,6 +69,7 @@ "AssetAny": ".definitions.asset", "AssetWatcher": ".definitions.asset", "BaseOperator": ".definitions.baseoperator", + "BaseOperatorLink": ".definitions.baseoperatorlink", "Connection": ".definitions.connection", "Context": ".definitions.context", "DAG": ".definitions.dag", diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 481fb2ee773d7..507349abfdca4 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -41,8 +41,8 @@ if TYPE_CHECKING: import jinja2 - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.mappedoperator import MappedOperator diff --git a/airflow/models/baseoperatorlink.py b/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py similarity index 100% rename from airflow/models/baseoperatorlink.py rename to task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index 92f83792f3c79..2608d4eefebf0 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -63,7 +63,6 @@ from airflow.models.abstractoperator import ( TaskStateChangeCallback, ) - from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.expandinput import ( ExpandInput, OperatorExpandArgument, @@ -71,6 +70,7 @@ ) from airflow.models.xcom_arg import XComArg from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.baseoperatorlink import BaseOperatorLink from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.types import Operator diff --git a/tests/cli/commands/local_commands/test_plugins_command.py b/tests/cli/commands/local_commands/test_plugins_command.py index af6a9ca5a27eb..d780d5886320e 100644 --- a/tests/cli/commands/local_commands/test_plugins_command.py +++ b/tests/cli/commands/local_commands/test_plugins_command.py @@ -26,8 +26,8 @@ from airflow.cli import cli_parser from airflow.cli.commands.local_commands import plugins_command from airflow.listeners.listener import get_listener_manager -from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.plugins_manager import AirflowPlugin +from airflow.sdk import BaseOperatorLink from tests.plugins.test_plugin import AirflowTestPlugin as ComplexAirflowPlugin from tests_common.test_utils.mock_plugins import mock_plugin_manager diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d9c27d8eb080a..39be12bd54f5a 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -57,7 +57,6 @@ ) from airflow.hooks.base import BaseHook from airflow.models.baseoperator import BaseOperator -from airflow.models.baseoperatorlink import XComOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagbag import DagBag @@ -68,6 +67,7 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink from airflow.sdk.definitions.param import Param, ParamsDict from airflow.security import permissions from airflow.serialization.enums import Encoding