Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagger/alerts/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def airflow_task_fail_alerts(alerts: List[AlertBase], context):
alert.execute(
task_instance.dag_id,
task_instance.task_id,
context["execution_date"],
context.get("logical_date", context.get("execution_date")),
run_time,
task_instance.log_url,
)
6 changes: 3 additions & 3 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def _get_default_args():

@staticmethod
def _get_execution_date_fn(from_dag_schedule: str, to_dag_schedule: str):
def execution_date_fn(execution_date, **kwargs):
to_dag_cron = croniter.croniter(to_dag_schedule, execution_date)
def execution_date_fn(logical_date, **kwargs):
to_dag_cron = croniter.croniter(to_dag_schedule, logical_date)
to_dag_next_schedule = to_dag_cron.get_next(datetime)

from_dag_cron = croniter.croniter(from_dag_schedule, to_dag_next_schedule)
Expand Down Expand Up @@ -104,7 +104,7 @@ def _create_dag(self, pipe_id, node):
description=pipeline.description,
default_args=default_args,
start_date=pipeline.start_date,
schedule_interval=pipeline.schedule,
schedule=pipeline.schedule,
user_defined_macros=user_defined_macros,
**pipeline.parameters,
)
Expand Down
2 changes: 1 addition & 1 deletion dagger/dag_creator/airflow/hooks/sqoop_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from copy import deepcopy

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.base import BaseHook


class SqoopHook(BaseHook):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def _create_operator(self, **kwargs):
importlib.import_module(python_module), self._task.function
)
params["python_callable"] = python_function
params["provide_context"] = True
params["op_kwargs"] = self._template_parameters

batch_op = operator_class(dag=self._dag, task_id=self._task.name, **params)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.empty import EmptyOperator
from dagger.dag_creator.airflow.operator_creator import OperatorCreator


Expand All @@ -11,4 +11,4 @@ def __init__(self, task, dag):
def _create_operator(self, **kwargs):
params = {**kwargs}

return DummyOperator(dag=self._dag, task_id=self._task.name, **params)
return EmptyOperator(dag=self._dag, task_id=self._task.name, **params)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
from os import path

from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from dagger import conf
from dagger.dag_creator.airflow.operator_creator import OperatorCreator

Expand All @@ -28,7 +28,6 @@ def _create_operator(self, **kwargs):
dag=self._dag,
task_id=self._task.name,
python_callable=python_function,
provide_context=True,
op_kwargs=self._template_parameters,
**params,
)
Expand Down
4 changes: 2 additions & 2 deletions dagger/dag_creator/airflow/operator_factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.empty import EmptyOperator
from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operator_creators import (
airflow_op_creator,
Expand All @@ -20,7 +20,7 @@
from dagger.utilities.classes import get_deep_obj_subclasses


class DataOperator(DummyOperator):
class DataOperator(EmptyOperator):
ui_color = "#e8f7e4"

def __init__(self, *args, **kwargs):
Expand Down
2 changes: 0 additions & 2 deletions dagger/dag_creator/airflow/operators/aws_athena_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from uuid import uuid4

from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator
from airflow.utils.decorators import apply_defaults
from dagger.dag_creator.airflow.hooks.aws_athena_hook import AWSAthenaHook
from dagger.utilities.randomise import generate_random_name
from tenacity import retry, stop_after_attempt, wait_fixed
Expand Down Expand Up @@ -53,7 +52,6 @@ class AWSAthenaOperator(DaggerBaseOperator):
template_fields = ('query', 'database', 'output_location')
template_ext = ('.sql', )

@apply_defaults
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't need to decorate with apply_defaults aymore?

def __init__(self, query, database, s3_tmp_results_location, s3_output_location, output_table, is_incremental,
partitioned_by=None, output_format=None, aws_conn_id='aws_default', client_request_token=None,
query_execution_context=None, result_configuration=None, sleep_time=30, max_tries=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from typing import Optional

from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.exceptions import AirflowException

from dagger.dag_creator.airflow.utils.decorators import lazy_property
Expand Down Expand Up @@ -48,7 +47,6 @@ class AwsGlueJobOperator(DaggerBaseOperator):
template_ext = ()
ui_color = '#ededed'

@apply_defaults
def __init__(
self,
*,
Expand All @@ -66,7 +64,7 @@ def __init__(

@lazy_property
def logs_client(self):
return AwsHook(aws_conn_id=self.aws_conn_id, client_type="logs").get_client_type(
return AwsBaseHook(aws_conn_id=self.aws_conn_id, client_type="logs").get_client_type(
"logs", region_name=self.region_name
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class DaggerBaseOperator(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand Down
2 changes: 0 additions & 2 deletions dagger/dag_creator/airflow/operators/postgres_operator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Iterable, Mapping, Optional, Union

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator


Expand All @@ -28,7 +27,6 @@ class PostgresOperator(DaggerBaseOperator):
template_ext = (".sql",)
ui_color = "#ededed"

@apply_defaults
def __init__(
self,
sql: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.www import utils as wwwutils

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand All @@ -45,10 +44,7 @@ class RedshiftSQLOperator(BaseOperator):

template_fields: Sequence[str] = ('sql',)
template_ext: Sequence[str] = ('.sql',)
# TODO: Remove renderer check when the provider has an Airflow 2.3+ requirement.
template_fields_renderers = {
"sql": "postgresql" if "postgresql" in wwwutils.get_attr_renderer() else "sql"
}
template_fields_renderers = {"sql": "postgresql"}

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.utils.decorators import apply_defaults
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # noqa: requires apache-airflow-providers-snowflake
from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator


Expand Down Expand Up @@ -30,7 +29,6 @@ class SnowflakeOperator(DaggerBaseOperator):
template_ext = (".sql",)
ui_color = "#ededed"

@apply_defaults
def __init__(
self,
sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import boto3
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults

from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator

ENV = os.environ["ENV"].lower()
Expand All @@ -16,7 +14,6 @@ class SparkSubmitOperator(DaggerBaseOperator):
ui_color = "bisque"
template_fields = ("job_args", "spark_args", "spark_conf_args")

@apply_defaults
def __init__(
self,
job_file,
Expand Down
2 changes: 0 additions & 2 deletions dagger/dag_creator/airflow/operators/sqoop_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import signal

from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults
from dagger.dag_creator.airflow.hooks.sqoop_hook import SqoopHook
from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator

Expand Down Expand Up @@ -42,7 +41,6 @@ class SqoopOperator(DaggerBaseOperator):
)
ui_color = "#7D8CA4"

@apply_defaults
def __init__(
self,
conn_id="sqoop_default",
Expand Down
3 changes: 1 addition & 2 deletions dagger/dag_creator/airflow/utils/operator_factories.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from functools import partial

from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.python import ShortCircuitOperator


def make_control_flow(is_dummy_operator_short_circuit, dag):
control_flow = ShortCircuitOperator(
task_id="dummy-control-flow",
dag=dag,
provide_context=True,
python_callable=partial(eval_control_flow, is_dummy_operator_short_circuit),
)
return control_flow
Expand Down
8 changes: 4 additions & 4 deletions dagger/dag_creator/airflow/utils/slack_alerts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.hooks.base import BaseHook

SLACK_CONN_ID = "slack"
ENV = os.environ["ENV"].lower()
Expand Down Expand Up @@ -38,7 +38,7 @@ def task_success_slack_alert(context):
task=context["task_instance"].task_id,
dag=context["task_instance"].dag_id,
ti=context["task_instance"],
exec_date=context["execution_date"],
exec_date=context.get("logical_date", context.get("execution_date")),
run_time=get_task_run_time(context["task_instance"]),
log_url=context["task_instance"].log_url,
)
Expand Down Expand Up @@ -81,7 +81,7 @@ def task_fail_slack_alert(context):
task=context["task_instance"].task_id,
dag=context["task_instance"].dag_id,
ti=context["task_instance"],
exec_date=context["execution_date"],
exec_date=context.get("logical_date", context.get("execution_date")),
run_time=get_task_run_time(context["task_instance"]),
log_url=context["task_instance"].log_url,
)
Expand Down
Loading