diff --git a/dagger/alerts/alert.py b/dagger/alerts/alert.py index 8cc2b08..893b150 100644 --- a/dagger/alerts/alert.py +++ b/dagger/alerts/alert.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from typing import List +from airflow.utils.types import DagRunType from slack.web.client import WebClient from dagger import conf from dagger.utilities.config_validator import Attribute, ConfigValidator @@ -98,9 +99,9 @@ def get_task_run_time(task_instance): def airflow_task_fail_alerts(alerts: List[AlertBase], context): if conf.ENV == "datatst": return - if context["dag_run"].external_trigger is True: + if context["dag_run"].run_type == DagRunType.MANUAL: return - if context["dag"].is_paused is True: + if getattr(context["dag"], "is_paused", False): return task_instance = context["task_instance"] diff --git a/dagger/dag_creator/airflow/utils/slack_alerts.py b/dagger/dag_creator/airflow/utils/slack_alerts.py index 86f4338..3da62f3 100644 --- a/dagger/dag_creator/airflow/utils/slack_alerts.py +++ b/dagger/dag_creator/airflow/utils/slack_alerts.py @@ -1,7 +1,7 @@ import os from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator -from airflow.hooks.base import BaseHook +from airflow.utils.types import DagRunType SLACK_CONN_ID = "slack" ENV = os.environ["ENV"].lower() @@ -11,6 +11,16 @@ def get_task_run_time(task_instance): return (task_instance.end_date - task_instance.start_date).total_seconds() +def _should_skip_alert(context): + if ENV == "datatst": + return True + if context["dag_run"].run_type == DagRunType.MANUAL: + return True + if getattr(context["dag"], "is_paused", False): + return True + return False + + def task_success_slack_alert(context): """ Callback task that can be used in DAG to alert of successful task completion @@ -19,14 +29,9 @@ def task_success_slack_alert(context): Returns: None: Calls the SlackWebhookOperator execute method internally """ - if ENV == "datatst": - return - if context["dag_run"].external_trigger is True: - return - if context["dag"].is_paused is True: + if _should_skip_alert(context): return - slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password slack_msg = """ :large_blue_circle: Task Succeeded! *Task*: {task} @@ -45,8 +50,7 @@ def task_success_slack_alert(context): success_alert = SlackWebhookOperator( task_id="slack_test", - http_conn_id="slack", - webhook_token=slack_webhook_token, + slack_webhook_conn_id=SLACK_CONN_ID, message=slack_msg, username="airflow", ) @@ -62,14 +66,9 @@ def task_fail_slack_alert(context): Returns: None: Calls the SlackWebhookOperator execute method internally """ - if ENV == "datatst": - return - if context["dag_run"].external_trigger is True: - return - if context["dag"].is_paused is True: + if _should_skip_alert(context): return - slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password slack_msg = """ :red_circle: Task Failed. *Task*: {task} @@ -88,8 +87,7 @@ def task_fail_slack_alert(context): failed_alert = SlackWebhookOperator( task_id=context["task_instance"].task_id, - http_conn_id=SLACK_CONN_ID, - webhook_token=slack_webhook_token, + slack_webhook_conn_id=SLACK_CONN_ID, message=slack_msg, username="airflow", )