먹수의 개발일지

Airflow DAG 실패 성공시 Slack 알림 본문

데이터 엔지니어링/Airflow

Airflow DAG 실패 성공시 Slack 알림

icandle 2024. 11. 7. 10:17

Intro

Airflow DAG를 실행시 실패 혹은 성공했을 때 알림을 받고자 했다.
Slack webhook url은 https://api.slack.com/apps 해당 사이트에서 Create New APP을 통해 새로 생성하거나 기존 webhook url을 확인할 수 있다.

Airflow Callback 작성

dags 폴더 구조는 아래와 같다.

dags
├── plugins
│   └── slack.py
└── test.py

 

 

Airflow에서 Callback 종류 *context로 전달되는 매개변수 정보

on_success_callback Task 성공 시 호출된다.
on_failure_callback Task 실패 시 호출된다.
sla_miss_callback Task가 정해진 SLA를 충족하지 못할 때 호출된다.
on_retry_callback Task가 재실행될 때 호출된다.

 

Airflow는 Task 실행시 여러 변수를 수집하여 execute()의 context 매개변수로 전달한다. context 매개변수에는 현재 작업에 대한 정보가 저장되어 있다.

 

 

/dags/plugins/slack.py

from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    :return: operator.execute
    """
    message = """
            :red_circle: Task Failed.
            *Dag*: {dag}
            *Task*: {task}
            *Execution Time*: {exec_date}
            *Exception*: {exception}
            *Log Url*: {log_url}
            """.format(
        dag=context.get('task_instance').dag_id,
        task=context.get('task_instance').task_id,
        exec_date=context.get('execution_date'),
        exception=context.get('exception'),
        log_url=context.get('task_instance').log_url
    )
    emoji = ":crying_cat_face:"

    send_message_to_a_slack_channel(message, emoji)


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Airflow", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

 

DAG에 on_failure_callaback 함수 추가

dags/test.py

from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from plugins import slack
from airflow.operators.hadoop_operator import HadoopOperator

DAG_ID = "postgres_operator_dag" 

default_args = {
    'owner': 'airflow',
    'schedule_interval': '@hourly',
    'start_date': datetime(2024, 11, 07),
    'tags': ['shop'],
    # 'retries': 1,
    'on_failure_callback': slack.on_failure_callback, # 실패시 SLACK 함수 요청
    'on_success_callback': slack.on_failure_callback, # 성공시 SLACK 함수 요청
}

with DAG(
    dag_id=DAG_ID,
    default_args=default_args, 
    catchup=False,
    schedule_interval = '12 23 * * *', 
) as dag:
    get_products = PostgresOperator(
        task_id="get_products",
        sql="SELECT * FROM product",
        # parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
        # hook_params={"options": "-c statement_timeout=3000ms"},
    )

    save_data_to_hadoop = HadoopOperator(
        task_id="save_data_to_hadoop",
        command="hadoop fs -put /path/to/my/data /path/to/hadoop/file",
        hadoop_conn_id="my_hadoop_connection",
        dag=dag,
    )

 

slack webhook url

slack을 미리 생성해둔 채널로 가도록 연동한다.

https://api.slack.com/messaging/webhooks

webhook URL을 복사해준다. services/ 뒤에 있는 값을 활용하게 된다.

 

Airflow의 Admin > Variables 탭에 들어가 위에서 얻은 url값을 저장해준다.

 

 

Slack message

일부러 없는 테이블명으로 조회를 해보았다. 실패시 아래와 같은 알림 메시지를 받을 수 있다.

Comments