Airflow 태스크/DAG가 실패했을 때 Slack으로 통지하는 메커니즘

6859 단어 슬랙Airflow

배경





↑이런 느낌으로 태스크가 실패하면 Slack에게 통지하는 구조를 구현하고 싶었다

구현


  • Slack에 대한 통지는 Webhook을 이용한 API를 이용한다 -> Sending messages using Incoming Webhooks
  • 각 Task에 Slack 통지 처리의 임베드는 Operator의 생성자 인수의 on_failure_callback 를 이용한다
  • DAG의 입자 크기로 모니터링하고 싶다면 DAG의 생성자 인수 on_failure_callback

  • slack_noti는 패키지화하여 각 DAG에서 호출 할 수있게하는 것이 편리합니다
  • 
    import airflow
    import requests
    import json
    from datetime import timedelta
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    
    WEBHOOK_URL = "https://hooks.slack.com/services/xxxxx"
    
    def slack_noti(context=""):
        url= WEBHOOK_URL
        requests.post(url, data = json.dumps({
            'username': 'Airflow',
            'channel': 'some_channel',
            'icon_url': 'https://xxx.jpeg',
            'attachments': [{
                'title': 'Failed: {}.{}.{}'.format(context['dag'], context['task'], context['execution_date']),
                "color": 'danger'
                }]
            }))
    
    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": airflow.utils.dates.days_ago(1),
        "retries": 1,
        "on_failure_callback": slack_noti,
        "retry_delay": timedelta(minutes=1)
    }
    
    dag = DAG("slack_dag", default_args=default_args, catchup=False, schedule_interval="@daily")
    
    def success_py():
        # 1/0
        return 'success'
    
    def fail_py():
        1/0
        return 'fail'
    
    sp = PythonOperator(
        task_id='success_py',
        python_callable=success_py,
        dag=dag,
    )
    
    fp = PythonOperator(
        task_id='fail_py',
        python_callable=fail_py,
        dag=dag,
    )
    
    sp >> fp
    

    좋은 웹페이지 즐겨찾기