Airflow 태스크/DAG가 실패했을 때 Slack으로 통지하는 메커니즘
배경
↑이런 느낌으로 태스크가 실패하면 Slack에게 통지하는 구조를 구현하고 싶었다
구현
slack_noti
는 패키지화하여 각 DAG에서 호출 할 수있게하는 것이 편리합니다
import airflow
import requests
import json
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
WEBHOOK_URL = "https://hooks.slack.com/services/xxxxx"
def slack_noti(context=""):
url= WEBHOOK_URL
requests.post(url, data = json.dumps({
'username': 'Airflow',
'channel': 'some_channel',
'icon_url': 'https://xxx.jpeg',
'attachments': [{
'title': 'Failed: {}.{}.{}'.format(context['dag'], context['task'], context['execution_date']),
"color": 'danger'
}]
}))
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"on_failure_callback": slack_noti,
"retry_delay": timedelta(minutes=1)
}
dag = DAG("slack_dag", default_args=default_args, catchup=False, schedule_interval="@daily")
def success_py():
# 1/0
return 'success'
def fail_py():
1/0
return 'fail'
sp = PythonOperator(
task_id='success_py',
python_callable=success_py,
dag=dag,
)
fp = PythonOperator(
task_id='fail_py',
python_callable=fail_py,
dag=dag,
)
sp >> fp
Reference
이 문제에 관하여(Airflow 태스크/DAG가 실패했을 때 Slack으로 통지하는 메커니즘), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/munaita_/items/1a5b131839e01ea7280d텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)