Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3309ca1
Moving BaseOperatorLink to task sdk
amoghrajesh Feb 24, 2025
c9a5c26
few more missed occurences
amoghrajesh Feb 24, 2025
86647d4
adding lazy import
amoghrajesh Feb 24, 2025
9e87b87
conditional import for providers
amoghrajesh Feb 24, 2025
76a8ce3
removing double occurences and fixing CI
amoghrajesh Feb 24, 2025
8d21dc5
adding newsfragments
amoghrajesh Feb 24, 2025
5bcacb1
removing old ref from init
amoghrajesh Feb 25, 2025
ca4a035
adding new ref to init
amoghrajesh Feb 25, 2025
56c5a86
fixing tests
amoghrajesh Feb 25, 2025
c8c916a
fixing ut
amoghrajesh Feb 25, 2025
41d11a5
fixing
amoghrajesh Feb 25, 2025
bd9dd3f
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Feb 28, 2025
4f1ace9
import from common compat
amoghrajesh Feb 28, 2025
13a3497
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Feb 28, 2025
6f69845
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Feb 28, 2025
67b5584
generated provider_dependencies.json
amoghrajesh Feb 28, 2025
20a6fc5
fixing static checks
amoghrajesh Feb 28, 2025
d229a54
fixing static checks
amoghrajesh Feb 28, 2025
6a6e5be
spelling ignore
amoghrajesh Feb 28, 2025
448d27a
anotehr try
amoghrajesh Feb 28, 2025
69ad977
anotehr try
amoghrajesh Feb 28, 2025
ea7ebe3
fixing imports
amoghrajesh Mar 3, 2025
7d62538
excluding docs for extra link
amoghrajesh Mar 3, 2025
d69c464
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 3, 2025
4045d09
cleaning the mess
amoghrajesh Mar 3, 2025
72605cd
excluding docs for extra link
amoghrajesh Mar 3, 2025
86437e2
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 3, 2025
4b2d2a7
generated provider_dependencies.json
amoghrajesh Mar 3, 2025
61204af
docs fix
amoghrajesh Mar 3, 2025
909d955
docs fix
amoghrajesh Mar 3, 2025
3532fc3
docs fix
amoghrajesh Mar 3, 2025
5f4392e
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 4, 2025
238db47
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 4, 2025
aa2dda1
review from ash
amoghrajesh Mar 4, 2025
9920d17
support spaces and tabs too
amoghrajesh Mar 4, 2025
9ff883a
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 5, 2025
8789738
fixing the precommit
amoghrajesh Mar 5, 2025
e8c74f7
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 5, 2025
4236711
Apply suggestions from code review
ashb Mar 6, 2025
121f865
Merge branch 'main' into AIP72-baseoperatorlink-to-sdk
amoghrajesh Mar 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,11 @@ repos:
- id: check-base-operator-usage
language: pygrep
name: Check BaseOperatorLink core imports
description: Make sure BaseOperatorLink is imported from airflow.models.baseoperatorlink in core
entry: "from airflow\\.models import.* BaseOperatorLink"
description: Make sure BaseOperatorLink is not imported from airflow.models in core
entry: "^\\s*from airflow\\.models\\.baseoperatorlink import BaseOperatorLink\\b"
files: \.py$
pass_filenames: true
echo: True
exclude: >
(?x)
^airflow/decorators/.*$|
Expand All @@ -708,9 +709,9 @@ repos:
^dev/provider_packages/.*$
- id: check-base-operator-usage
language: pygrep
name: Check BaseOperator[Link] other imports
description: Make sure BaseOperator[Link] is imported from airflow.models outside of core
entry: "from airflow\\.models\\.baseoperator(link)? import.* BaseOperator"
name: Check BaseOperator other imports
description: Make sure BaseOperator is imported from airflow.models outside of core
entry: "from airflow\\.models\\.baseoperator import.* BaseOperator"
pass_filenames: true
files: >
(?x)
Comment thread
amoghrajesh marked this conversation as resolved.
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __getattr__(name):
"ID_LEN": "airflow.models.base",
"Base": "airflow.models.base",
"BaseOperator": "airflow.models.baseoperator",
"BaseOperatorLink": "airflow.models.baseoperatorlink",
"BaseOperatorLink": "airflow.sdk.definitions.baseoperatorlink",
"Connection": "airflow.models.connection",
"DagBag": "airflow.models.dagbag",
"DagModel": "airflow.models.dag",
Expand Down Expand Up @@ -117,7 +117,6 @@ def __getattr__(name):
from airflow.jobs.job import Job
from airflow.models.base import ID_LEN, Base
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.connection import Connection
from airflow.models.dag import DAG, DagModel, DagTag
from airflow.models.dagbag import DagBag
Expand All @@ -137,4 +136,5 @@ def __getattr__(name):
from airflow.models.trigger import Trigger
from airflow.models.variable import Variable
from airflow.models.xcom import XCom
from airflow.sdk import BaseOperatorLink
from airflow.sdk.definitions.param import Param
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@
from sqlalchemy.orm import Session

from airflow.models.abstractoperator import TaskStateChangeCallback
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.dag import DAG as SchedulerDAG
from airflow.models.operator import Operator
from airflow.sdk import BaseOperatorLink
from airflow.sdk.definitions.node import DAGNode
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
Expand Down
3 changes: 2 additions & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.exceptions import AirflowException, SerializationError, TaskDeferred
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink, XComOperatorLink
from airflow.models.connection import Connection
from airflow.models.dag import DAG, _get_model_data_interval
from airflow.models.expandinput import (
Expand All @@ -64,6 +63,7 @@
BaseAsset,
)
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
Comment thread
amoghrajesh marked this conversation as resolved.
Expand Down Expand Up @@ -98,6 +98,7 @@

from airflow.models import DagRun
from airflow.models.expandinput import ExpandInput
from airflow.sdk import BaseOperatorLink
from airflow.sdk.definitions._internal.node import DAGNode
from airflow.sdk.types import Operator
from airflow.serialization.json_schema import Validator
Expand Down
2 changes: 1 addition & 1 deletion contributing-docs/08_static_code_checks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------+---------+
| check-base-operator-usage | * Check BaseOperator core imports | |
| | * Check BaseOperatorLink core imports | |
| | * Check BaseOperator[Link] other imports | |
| | * Check BaseOperator other imports | |
+-----------------------------------------------------------+--------------------------------------------------------+---------+
| check-boring-cyborg-configuration | Checks for Boring Cyborg configuration consistency | |
+-----------------------------------------------------------+--------------------------------------------------------+---------+
Expand Down
2 changes: 1 addition & 1 deletion devel-common/src/tests_common/test_utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.models.errors import ImportError as ParseImportError # type: ignore[no-redef,attr-defined]

try:
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.sdk import BaseOperatorLink
except ImportError:
# Compatibility for Airflow 2.7.*
from airflow.models.baseoperator import BaseOperatorLink
Expand Down
6 changes: 3 additions & 3 deletions docs/apache-airflow/howto/define-extra-link.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The following code shows how to add extra links to an operator via Plugins:
.. code-block:: python

from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.sdk import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin

Expand Down Expand Up @@ -85,7 +85,7 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope
.. code-block:: python

from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.sdk import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
Expand Down Expand Up @@ -128,7 +128,7 @@ Console, but if we wanted to change that link we could:
.. code-block:: python

from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.sdk import BaseOperatorLink
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import XCom
from airflow.plugins_manager import AirflowPlugin
Expand Down
10 changes: 3 additions & 7 deletions docs/apache-airflow/public-airflow-interface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,15 @@ can be implemented to respond to DAG/Task lifecycle events.

You can read more about Listeners in :doc:`administration-and-deployment/listeners`.

..
TODO AIP-72: This class has been moved to task sdk but we cannot add a doc reference for it yet because task sdk doesn't have rendered docs yet.

Extra Links
-----------

Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally
they can be defined by the Operators, but plugins allow you to override the links on a global level.

.. toctree::
:includehidden:
:glob:
:maxdepth: 1

_api/airflow/models/baseoperatorlink/index

You can read more about the Extra Links in :doc:`/howto/define-extra-link`.

Using Public Interface to integrate with external services and applications
Expand Down
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ def _get_rst_filepath_from_path(filepath: pathlib.Path):

models_included: set[str] = {
"baseoperator.py",
"baseoperatorlink.py",
"connection.py",
"dag.py",
"dagrun.py",
Expand Down
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ BaseHook
BaseObject
BaseOperator
baseOperator
BaseOperatorLink
baseoperatorlink
basestring
basetaskrunner
BaseView
Expand Down
5 changes: 4 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@
"devel-deps": [],
"plugins": [],
"cross-providers-deps": [
"common.compat",
"http",
"openlineage"
],
Expand Down Expand Up @@ -1404,7 +1405,9 @@
],
"devel-deps": [],
"plugins": [],
"cross-providers-deps": [],
"cross-providers-deps": [
"common.compat"
],
"excluded-python-versions": [],
"state": "ready"
},
Expand Down
11 changes: 10 additions & 1 deletion newsfragments/aip-72.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ As part of this change the following breaking changes have occurred:

It is recommended that you replace such a custom operator with a deferrable sensor, a condition or another triggering mechanism.

- ``BaseOperatorLink`` has now been moved into the task SDK to be consumed by DAG authors to write custom operator links.

Any occurrences of imports from ``airflow.models.baseoperatorlink`` will need to be updated to ``airflow.sdk.definitions.baseoperatorlink``

* Types of change

Expand All @@ -50,7 +53,7 @@ As part of this change the following breaking changes have occurred:
* [ ] API changes
* [ ] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [x] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes

Expand All @@ -60,3 +63,9 @@ As part of this change the following breaking changes have occurred:

* [x] ``core.task_runner``
* [x] ``core.enable_xcom_pickling``

* ruff

* AIR302

* [ ] ``airflow.models.baseoperatorlink`` → ``airflow.sdk``
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@

from typing import TYPE_CHECKING, ClassVar

from airflow.models import BaseOperatorLink, XCom
from airflow.models import XCom
from airflow.providers.amazon.aws.utils.suppress import return_on_error
from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


BASE_AWS_CONSOLE_LINK = "https://console.{aws_domain}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.models import BaseOperator, XCom
from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState
from airflow.providers.databricks.operators.databricks_workflow import (
DatabricksWorkflowTaskGroup,
Expand All @@ -41,12 +41,18 @@
)
from airflow.providers.databricks.triggers.databricks import DatabricksExecutionTrigger
from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
from airflow.utils.task_group import TaskGroup

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]

DEFER_METHOD_NAME = "execute_complete"
XCOM_RUN_ID_KEY = "run_id"
XCOM_JOB_ID_KEY = "job_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from flask_appbuilder.api import expose

from airflow.exceptions import AirflowException, TaskInstanceNotFound
from airflow.models import BaseOperator, BaseOperatorLink, DagBag
from airflow.models import DagBag
from airflow.models.dag import DAG, clear_task_instances
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
Expand All @@ -48,8 +48,14 @@
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models import BaseOperator
from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20)
REPAIR_WAIT_DELAY = os.getenv("DATABRICKS_REPAIR_WAIT_DELAY", 0.5)
Expand Down
15 changes: 8 additions & 7 deletions providers/dbt/cloud/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ You can install such cross-provider dependencies when installing from PyPI. For

.. code-block:: bash

pip install apache-airflow-providers-dbt-cloud[http]
pip install apache-airflow-providers-dbt-cloud[common.compat]


============================================================================================================== ===============
Dependent package Extra
============================================================================================================== ===============
`apache-airflow-providers-http <https://airflow.apache.org/docs/apache-airflow-providers-http>`_ ``http``
`apache-airflow-providers-openlineage <https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_ ``openlineage``
============================================================================================================== ===============
================================================================================================================== =================
Dependent package Extra
================================================================================================================== =================
`apache-airflow-providers-common-compat <https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_ ``common.compat``
`apache-airflow-providers-http <https://airflow.apache.org/docs/apache-airflow-providers-http>`_ ``http``
`apache-airflow-providers-openlineage <https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_ ``openlineage``
================================================================================================================== =================

The changelog for the provider package can be found in the
`changelog <https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/4.2.0/changelog.html>`_.
4 changes: 4 additions & 0 deletions providers/dbt/cloud/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ dependencies = [
"openlineage" = [
"apache-airflow-providers-openlineage>=1.7.0",
]
"common.compat" = [
"apache-airflow-providers-common-compat"
]

[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-http",
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def get_provider_info():
"asgiref>=2.3.0",
"aiohttp>=3.9.2",
],
"optional-dependencies": {"openlineage": ["apache-airflow-providers-openlineage>=1.7.0"]},
"optional-dependencies": {
"openlineage": ["apache-airflow-providers-openlineage>=1.7.0"],
"common.compat": ["apache-airflow-providers-common-compat"],
},
"devel-dependencies": [],
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from typing import TYPE_CHECKING, Any

from airflow.configuration import conf
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.models import BaseOperator, XCom
from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.dbt.cloud.hooks.dbt import (
DbtCloudHook,
DbtCloudJobRunException,
Expand All @@ -38,6 +39,11 @@
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


class DbtCloudRunJobOperatorLink(BaseOperatorLink):
"""Allows users to monitor the triggered job run directly in dbt Cloud."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

from typing import TYPE_CHECKING, ClassVar

from airflow.models import BaseOperatorLink, XCom
from airflow.models import XCom
from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
else:
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]

BASE_LINK = "https://console.cloud.google.com"

Expand Down
Loading