Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- hive
- 물리삭제
- 계정 관리
- 티스토리챌린지
- 오블완
- nginx
- unique constraint
- ci/cd
- JWT
- prometheus
- aws ec2
- 논리삭제
- Django
- docker
- node exporter
- soft delete
- logstash
- grafana
- NoSQL
- elasticsearch
- redis
- 로그 백업
- DAG
- slack
- Hadoop
- Airflow
- Next.js
- Locust
- hard delete
- AWS
Archives
- Today
- Total
먹수의 개발일지
Airflow DAG 실패 성공시 Slack 알림 본문
Intro
Airflow DAG를 실행시 실패 혹은 성공했을 때 알림을 받고자 했다.
Slack webhook url은 https://api.slack.com/apps 해당 사이트에서 Create New APP을 통해 새로 생성하거나 기존 webhook url을 확인할 수 있다.
Airflow Callback 작성
dags 폴더 구조는 아래와 같다.
dags
├── plugins
│ └── slack.py
└── test.py
Airflow에서 Callback 종류 *context로 전달되는 매개변수 정보
on_success_callback | Task 성공 시 호출된다. |
on_failure_callback | Task 실패 시 호출된다. |
sla_miss_callback | Task가 정해진 SLA를 충족하지 못할 때 호출된다. |
on_retry_callback | Task가 재실행될 때 호출된다. |
Airflow는 Task 실행시 여러 변수를 수집하여 execute()의 context 매개변수로 전달한다. context 매개변수에는 현재 작업에 대한 정보가 저장되어 있다.
/dags/plugins/slack.py
from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
"""
:return: operator.execute
"""
message = """
:red_circle: Task Failed.
*Dag*: {dag}
*Task*: {task}
*Execution Time*: {exec_date}
*Exception*: {exception}
*Log Url*: {log_url}
""".format(
dag=context.get('task_instance').dag_id,
task=context.get('task_instance').task_id,
exec_date=context.get('execution_date'),
exception=context.get('exception'),
log_url=context.get('task_instance').log_url
)
emoji = ":crying_cat_face:"
send_message_to_a_slack_channel(message, emoji)
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Airflow", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
DAG에 on_failure_callaback 함수 추가
dags/test.py
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from plugins import slack
from airflow.operators.hadoop_operator import HadoopOperator
DAG_ID = "postgres_operator_dag"
default_args = {
'owner': 'airflow',
'schedule_interval': '@hourly',
'start_date': datetime(2024, 11, 07),
'tags': ['shop'],
# 'retries': 1,
'on_failure_callback': slack.on_failure_callback, # 실패시 SLACK 함수 요청
'on_success_callback': slack.on_failure_callback, # 성공시 SLACK 함수 요청
}
with DAG(
dag_id=DAG_ID,
default_args=default_args,
catchup=False,
schedule_interval = '12 23 * * *',
) as dag:
get_products = PostgresOperator(
task_id="get_products",
sql="SELECT * FROM product",
# parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
# hook_params={"options": "-c statement_timeout=3000ms"},
)
save_data_to_hadoop = HadoopOperator(
task_id="save_data_to_hadoop",
command="hadoop fs -put /path/to/my/data /path/to/hadoop/file",
hadoop_conn_id="my_hadoop_connection",
dag=dag,
)
slack webhook url
slack을 미리 생성해둔 채널로 가도록 연동한다.
https://api.slack.com/messaging/webhooks
webhook URL을 복사해준다. services/ 뒤에 있는 값을 활용하게 된다.
Airflow의 Admin > Variables 탭에 들어가 위에서 얻은 url값을 저장해준다.
Slack message
일부러 없는 테이블명으로 조회를 해보았다. 실패시 아래와 같은 알림 메시지를 받을 수 있다.
'데이터 엔지니어링 > Airflow' 카테고리의 다른 글
[Airflow] airflow 계정 관리 (계정 조회/생성/삭제) (0) | 2024.11.17 |
---|---|
Debugging Airflow in a Containered with VS Code (0) | 2024.08.21 |
Comments