From 1f5bdea4839eddaf9c3d59fbd29f2879574f9236 Mon Sep 17 00:00:00 2001 From: Mohamad Hallak <16711801+mrhallak@users.noreply.github.com> Date: Wed, 15 Apr 2026 12:53:41 +0200 Subject: [PATCH 1/2] Migrate imports for Airflow 3.x compatibility Breaking import changes: - DummyOperator -> EmptyOperator (airflow.operators.empty) - PythonOperator/ShortCircuitOperator -> airflow.operators.python - airflow.contrib.hooks.aws_hook.AwsHook -> providers.amazon AwsBaseHook - airflow.contrib.hooks.snowflake_hook -> providers.snowflake - airflow.contrib.operators.slack_webhook_operator -> providers.slack - airflow.hooks.base_hook -> airflow.hooks.base - airflow.www.utils removed (used in redshift_sql_operator) Removed deprecated patterns: - @apply_defaults decorator (automatic in Airflow 3) - provide_context=True parameter (always provided in Airflow 3) Renamed context variables: - execution_date -> logical_date (with fallback for compatibility) --- dagger/alerts/alert.py | 2 +- dagger/dag_creator/airflow/dag_creator.py | 4 ++-- dagger/dag_creator/airflow/hooks/sqoop_hook.py | 2 +- .../airflow/operator_creators/airflow_op_creator.py | 1 - .../airflow/operator_creators/dummy_creator.py | 4 ++-- .../airflow/operator_creators/python_creator.py | 3 +-- dagger/dag_creator/airflow/operator_factory.py | 4 ++-- .../dag_creator/airflow/operators/aws_athena_operator.py | 2 -- .../airflow/operators/aws_glue_job_operator.py | 6 ++---- .../dag_creator/airflow/operators/dagger_base_operator.py | 2 -- dagger/dag_creator/airflow/operators/postgres_operator.py | 2 -- .../airflow/operators/redshift_sql_operator.py | 6 +----- .../dag_creator/airflow/operators/snowflake_operator.py | 4 +--- .../airflow/operators/spark_submit_operator.py | 3 --- dagger/dag_creator/airflow/operators/sqoop_operator.py | 2 -- dagger/dag_creator/airflow/utils/operator_factories.py | 3 +-- dagger/dag_creator/airflow/utils/slack_alerts.py | 8 ++++---- 17 files changed, 18 insertions(+), 40 deletions(-) diff --git a/dagger/alerts/alert.py b/dagger/alerts/alert.py index c020d57..8cc2b08 100644 --- a/dagger/alerts/alert.py +++ b/dagger/alerts/alert.py @@ -114,7 +114,7 @@ def airflow_task_fail_alerts(alerts: List[AlertBase], context): alert.execute( task_instance.dag_id, task_instance.task_id, - context["execution_date"], + context.get("logical_date", context.get("execution_date")), run_time, task_instance.log_url, ) diff --git a/dagger/dag_creator/airflow/dag_creator.py b/dagger/dag_creator/airflow/dag_creator.py index 031a3a4..4ca52b0 100644 --- a/dagger/dag_creator/airflow/dag_creator.py +++ b/dagger/dag_creator/airflow/dag_creator.py @@ -35,8 +35,8 @@ def _get_default_args(): @staticmethod def _get_execution_date_fn(from_dag_schedule: str, to_dag_schedule: str): - def execution_date_fn(execution_date, **kwargs): - to_dag_cron = croniter.croniter(to_dag_schedule, execution_date) + def execution_date_fn(logical_date, **kwargs): + to_dag_cron = croniter.croniter(to_dag_schedule, logical_date) to_dag_next_schedule = to_dag_cron.get_next(datetime) from_dag_cron = croniter.croniter(from_dag_schedule, to_dag_next_schedule) diff --git a/dagger/dag_creator/airflow/hooks/sqoop_hook.py b/dagger/dag_creator/airflow/hooks/sqoop_hook.py index 03c8b91..f44610a 100644 --- a/dagger/dag_creator/airflow/hooks/sqoop_hook.py +++ b/dagger/dag_creator/airflow/hooks/sqoop_hook.py @@ -26,7 +26,7 @@ from copy import deepcopy from airflow.exceptions import AirflowException -from airflow.hooks.base_hook import BaseHook +from airflow.hooks.base import BaseHook class SqoopHook(BaseHook): diff --git a/dagger/dag_creator/airflow/operator_creators/airflow_op_creator.py b/dagger/dag_creator/airflow/operator_creators/airflow_op_creator.py index ce842ad..f93ceba 100644 --- a/dagger/dag_creator/airflow/operator_creators/airflow_op_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/airflow_op_creator.py @@ -26,7 +26,6 @@ def _create_operator(self, **kwargs): importlib.import_module(python_module), self._task.function ) params["python_callable"] = python_function - params["provide_context"] = True params["op_kwargs"] = self._template_parameters batch_op = operator_class(dag=self._dag, task_id=self._task.name, **params) diff --git a/dagger/dag_creator/airflow/operator_creators/dummy_creator.py b/dagger/dag_creator/airflow/operator_creators/dummy_creator.py index 9b0d37e..62e1dbe 100644 --- a/dagger/dag_creator/airflow/operator_creators/dummy_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/dummy_creator.py @@ -1,4 +1,4 @@ -from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.empty import EmptyOperator from dagger.dag_creator.airflow.operator_creator import OperatorCreator @@ -11,4 +11,4 @@ def __init__(self, task, dag): def _create_operator(self, **kwargs): params = {**kwargs} - return DummyOperator(dag=self._dag, task_id=self._task.name, **params) + return EmptyOperator(dag=self._dag, task_id=self._task.name, **params) diff --git a/dagger/dag_creator/airflow/operator_creators/python_creator.py b/dagger/dag_creator/airflow/operator_creators/python_creator.py index 89dc546..1b8d435 100644 --- a/dagger/dag_creator/airflow/operator_creators/python_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/python_creator.py @@ -1,7 +1,7 @@ import importlib from os import path -from airflow.operators.python_operator import PythonOperator +from airflow.operators.python import PythonOperator from dagger import conf from dagger.dag_creator.airflow.operator_creator import OperatorCreator @@ -28,7 +28,6 @@ def _create_operator(self, **kwargs): dag=self._dag, task_id=self._task.name, python_callable=python_function, - provide_context=True, op_kwargs=self._template_parameters, **params, ) diff --git a/dagger/dag_creator/airflow/operator_factory.py b/dagger/dag_creator/airflow/operator_factory.py index 079e3a0..a88ce7d 100644 --- a/dagger/dag_creator/airflow/operator_factory.py +++ b/dagger/dag_creator/airflow/operator_factory.py @@ -1,4 +1,4 @@ -from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.empty import EmptyOperator from dagger.dag_creator.airflow.operator_creator import OperatorCreator from dagger.dag_creator.airflow.operator_creators import ( airflow_op_creator, @@ -20,7 +20,7 @@ from dagger.utilities.classes import get_deep_obj_subclasses -class DataOperator(DummyOperator): +class DataOperator(EmptyOperator): ui_color = "#e8f7e4" def __init__(self, *args, **kwargs): diff --git a/dagger/dag_creator/airflow/operators/aws_athena_operator.py b/dagger/dag_creator/airflow/operators/aws_athena_operator.py index 03cf96d..598067a 100644 --- a/dagger/dag_creator/airflow/operators/aws_athena_operator.py +++ b/dagger/dag_creator/airflow/operators/aws_athena_operator.py @@ -21,7 +21,6 @@ from uuid import uuid4 from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator -from airflow.utils.decorators import apply_defaults from dagger.dag_creator.airflow.hooks.aws_athena_hook import AWSAthenaHook from dagger.utilities.randomise import generate_random_name from tenacity import retry, stop_after_attempt, wait_fixed @@ -53,7 +52,6 @@ class AWSAthenaOperator(DaggerBaseOperator): template_fields = ('query', 'database', 'output_location') template_ext = ('.sql', ) - @apply_defaults def __init__(self, query, database, s3_tmp_results_location, s3_output_location, output_table, is_incremental, partitioned_by=None, output_format=None, aws_conn_id='aws_default', client_request_token=None, query_execution_context=None, result_configuration=None, sleep_time=30, max_tries=None, diff --git a/dagger/dag_creator/airflow/operators/aws_glue_job_operator.py b/dagger/dag_creator/airflow/operators/aws_glue_job_operator.py index b75e895..05cb668 100644 --- a/dagger/dag_creator/airflow/operators/aws_glue_job_operator.py +++ b/dagger/dag_creator/airflow/operators/aws_glue_job_operator.py @@ -19,8 +19,7 @@ from typing import Optional from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.exceptions import AirflowException from dagger.dag_creator.airflow.utils.decorators import lazy_property @@ -48,7 +47,6 @@ class AwsGlueJobOperator(DaggerBaseOperator): template_ext = () ui_color = '#ededed' - @apply_defaults def __init__( self, *, @@ -66,7 +64,7 @@ def __init__( @lazy_property def logs_client(self): - return AwsHook(aws_conn_id=self.aws_conn_id, client_type="logs").get_client_type( + return AwsBaseHook(aws_conn_id=self.aws_conn_id, client_type="logs").get_client_type( "logs", region_name=self.region_name ) diff --git a/dagger/dag_creator/airflow/operators/dagger_base_operator.py b/dagger/dag_creator/airflow/operators/dagger_base_operator.py index cb941d9..854fbec 100644 --- a/dagger/dag_creator/airflow/operators/dagger_base_operator.py +++ b/dagger/dag_creator/airflow/operators/dagger_base_operator.py @@ -1,9 +1,7 @@ from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults class DaggerBaseOperator(BaseOperator): - @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/dagger/dag_creator/airflow/operators/postgres_operator.py b/dagger/dag_creator/airflow/operators/postgres_operator.py index ce90250..2d20312 100644 --- a/dagger/dag_creator/airflow/operators/postgres_operator.py +++ b/dagger/dag_creator/airflow/operators/postgres_operator.py @@ -1,7 +1,6 @@ from typing import Iterable, Mapping, Optional, Union from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.utils.decorators import apply_defaults from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator @@ -28,7 +27,6 @@ class PostgresOperator(DaggerBaseOperator): template_ext = (".sql",) ui_color = "#ededed" - @apply_defaults def __init__( self, sql: str, diff --git a/dagger/dag_creator/airflow/operators/redshift_sql_operator.py b/dagger/dag_creator/airflow/operators/redshift_sql_operator.py index 3319828..eeed62f 100644 --- a/dagger/dag_creator/airflow/operators/redshift_sql_operator.py +++ b/dagger/dag_creator/airflow/operators/redshift_sql_operator.py @@ -19,7 +19,6 @@ from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook -from airflow.www import utils as wwwutils if TYPE_CHECKING: from airflow.utils.context import Context @@ -45,10 +44,7 @@ class RedshiftSQLOperator(BaseOperator): template_fields: Sequence[str] = ('sql',) template_ext: Sequence[str] = ('.sql',) - # TODO: Remove renderer check when the provider has an Airflow 2.3+ requirement. - template_fields_renderers = { - "sql": "postgresql" if "postgresql" in wwwutils.get_attr_renderer() else "sql" - } + template_fields_renderers = {"sql": "postgresql"} def __init__( self, diff --git a/dagger/dag_creator/airflow/operators/snowflake_operator.py b/dagger/dag_creator/airflow/operators/snowflake_operator.py index 7e56320..e7df026 100644 --- a/dagger/dag_creator/airflow/operators/snowflake_operator.py +++ b/dagger/dag_creator/airflow/operators/snowflake_operator.py @@ -1,5 +1,4 @@ -from airflow.contrib.hooks.snowflake_hook import SnowflakeHook -from airflow.utils.decorators import apply_defaults +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # noqa: requires apache-airflow-providers-snowflake from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator @@ -30,7 +29,6 @@ class SnowflakeOperator(DaggerBaseOperator): template_ext = (".sql",) ui_color = "#ededed" - @apply_defaults def __init__( self, sql, diff --git a/dagger/dag_creator/airflow/operators/spark_submit_operator.py b/dagger/dag_creator/airflow/operators/spark_submit_operator.py index 31f6a70..af5d77f 100644 --- a/dagger/dag_creator/airflow/operators/spark_submit_operator.py +++ b/dagger/dag_creator/airflow/operators/spark_submit_operator.py @@ -4,8 +4,6 @@ import boto3 from airflow.exceptions import AirflowException -from airflow.utils.decorators import apply_defaults - from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator ENV = os.environ["ENV"].lower() @@ -16,7 +14,6 @@ class SparkSubmitOperator(DaggerBaseOperator): ui_color = "bisque" template_fields = ("job_args", "spark_args", "spark_conf_args") - @apply_defaults def __init__( self, job_file, diff --git a/dagger/dag_creator/airflow/operators/sqoop_operator.py b/dagger/dag_creator/airflow/operators/sqoop_operator.py index 435c4ce..b345b2c 100644 --- a/dagger/dag_creator/airflow/operators/sqoop_operator.py +++ b/dagger/dag_creator/airflow/operators/sqoop_operator.py @@ -2,7 +2,6 @@ import signal from airflow.exceptions import AirflowException -from airflow.utils.decorators import apply_defaults from dagger.dag_creator.airflow.hooks.sqoop_hook import SqoopHook from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator @@ -42,7 +41,6 @@ class SqoopOperator(DaggerBaseOperator): ) ui_color = "#7D8CA4" - @apply_defaults def __init__( self, conn_id="sqoop_default", diff --git a/dagger/dag_creator/airflow/utils/operator_factories.py b/dagger/dag_creator/airflow/utils/operator_factories.py index f51f1a4..0db93b1 100644 --- a/dagger/dag_creator/airflow/utils/operator_factories.py +++ b/dagger/dag_creator/airflow/utils/operator_factories.py @@ -1,13 +1,12 @@ from functools import partial -from airflow.operators.python_operator import ShortCircuitOperator +from airflow.operators.python import ShortCircuitOperator def make_control_flow(is_dummy_operator_short_circuit, dag): control_flow = ShortCircuitOperator( task_id="dummy-control-flow", dag=dag, - provide_context=True, python_callable=partial(eval_control_flow, is_dummy_operator_short_circuit), ) return control_flow diff --git a/dagger/dag_creator/airflow/utils/slack_alerts.py b/dagger/dag_creator/airflow/utils/slack_alerts.py index 26af5b4..86f4338 100644 --- a/dagger/dag_creator/airflow/utils/slack_alerts.py +++ b/dagger/dag_creator/airflow/utils/slack_alerts.py @@ -1,7 +1,7 @@ import os -from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator -from airflow.hooks.base_hook import BaseHook +from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator +from airflow.hooks.base import BaseHook SLACK_CONN_ID = "slack" ENV = os.environ["ENV"].lower() @@ -38,7 +38,7 @@ def task_success_slack_alert(context): task=context["task_instance"].task_id, dag=context["task_instance"].dag_id, ti=context["task_instance"], - exec_date=context["execution_date"], + exec_date=context.get("logical_date", context.get("execution_date")), run_time=get_task_run_time(context["task_instance"]), log_url=context["task_instance"].log_url, ) @@ -81,7 +81,7 @@ def task_fail_slack_alert(context): task=context["task_instance"].task_id, dag=context["task_instance"].dag_id, ti=context["task_instance"], - exec_date=context["execution_date"], + exec_date=context.get("logical_date", context.get("execution_date")), run_time=get_task_run_time(context["task_instance"]), log_url=context["task_instance"].log_url, ) From 6d3901913f411e9ca2db577e11515a38709bd66c Mon Sep 17 00:00:00 2001 From: Mohamad Hallak <16711801+mrhallak@users.noreply.github.com> Date: Wed, 15 Apr 2026 13:06:51 +0200 Subject: [PATCH 2/2] Rename schedule_interval to schedule (removed in Airflow 3) --- dagger/dag_creator/airflow/dag_creator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagger/dag_creator/airflow/dag_creator.py b/dagger/dag_creator/airflow/dag_creator.py index 4ca52b0..1d87974 100644 --- a/dagger/dag_creator/airflow/dag_creator.py +++ b/dagger/dag_creator/airflow/dag_creator.py @@ -104,7 +104,7 @@ def _create_dag(self, pipe_id, node): description=pipeline.description, default_args=default_args, start_date=pipeline.start_date, - schedule_interval=pipeline.schedule, + schedule=pipeline.schedule, user_defined_macros=user_defined_macros, **pipeline.parameters, )