Apache Airflow 작업 실패를 Teams에 알립니다.

소개



Apache Airflow는 태스크가 실패했을 경우에 호출할 수 있는 on_failure_callback 라고 하는 구조가 있다. 이를 사용하여 Microsoft Teams의 특정 채널에 경고를 POST하는 메커니즘을 만듭니다.

Teams에 알림이 작동하는 방식



Teams에게 메시지를 POST하는 처리는 Logic App 로 작성해, 그것을 호출하도록 Airflow측에서 구현하는 것이 편리할 것 같다. 이번에는 Teams에 POST하는 것만의 처리로 했지만, 이것 이외의 통지 방법(메일 등)이나, 자동화 처리도 필요에 따라서 간단하게 추가할 수 있으므로, 이 부분을 Logic App에 맡긴다고 하는 것은 좋은 아이디어의 같아요.


설정



Logic App 설정



우선 Logic App을 설정한다. HTTP 요청을 받을 때 시작하도록 트리거를 설정하고 Teams에 메시지를 Post하는 간단한 워크플로를 정의합니다.


HTTP 요청 수신 설정



HTTP 요청을 수신할 때 Request Body로 수신할 JSON 스키마를 정의한다. 여기서는 Airflow로부터 건네줄 수 있는 Context의 일부의 값을 받는 것을 상정해 3개의 필드를 정의하고 있다.

정의 JSON Text는 다음과 같습니다.
{
    "properties": {
        "ExecDate": {
            "type": "string"
        },
        "RunID": {
            "type": "string"
        },
        "TaskInstance": {
            "type": "string"
        }
    },
    "type": "object"
}

Teams에 알림 설정



다음에 Teams의 통지의 설정을 한다. 편리하게도 Logic App에는 이미 Teams를 조작하기 위한 부품이 준비되어 있다. 그 중에서 이번은 Post a message (V3) (Preview)를 선택.
Team을 선택하고, Post처의 Channel을 선택해,Message란에 통지하고 싶은 내용을 기재한다. HTTP request body로 받는 항목을 사용하여 아래 그림과 같이 조립했다.


알림 테스트용 DAG 만들기


default_argson_failure_callback 를 지정하면, 어떤 태스크라도 실패했을 경우에 통지할 수 있도록 할 수 있다. post_teams_channel 라는 함수를 작성해, on_failure_callback 가 발화했을 때에 호출할 수 있도록(듯이) 했다. 여기에서는 성공하는 태스크 success 와 실패하는 태스크 failure 를 모두 움직여 거동을 조사한다.
import airflow
import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# Logic AppにHTTP POSTリクエストを送信する。ContextからDAGの情報をrequest bodyに渡す。
def post_teams_channel(context):
    url = "https://<Logic App URL>"
    headers = {'content-type': 'application/json'}
    payload={
       'TaskInstance': str(context['task_instance_key_str']),
       'RunID': str(context['run_id']),
       'ExecDate': str(context['execution_date'])
    }
    r = requests.post(url, headers=headers, json=payload)

# on_failure_callbackをここに実装
args = {
    "owner": "airflow",
    "email": ["[email protected]"],
    "depends_on_past": False,
    "on_failure_callback": post_teams_channel,
    "start_date": airflow.utils.dates.days_ago(0)
}

# DAGの作成
dag = DAG(dag_id="teams-notify",
        default_args=args,
        schedule_interval="@daily")

# 成功するタスク "success"
t1 = BashOperator(
    task_id='success',
    bash_command='exit 0',
    dag=dag,
)

# 失敗するタスク "failure"
t2 = BashOperator(
    task_id='failure',
    bash_command='exit 1',
    dag=dag,
)

# タスクの依存関係定義
t1 >> t2

알림 테스트



준비가 완료되었으므로 실제로 DAG를 트리거하여 동작을 테스트해 본다. 다음 명령을 실행하여 즉시 트리거합니다.
airflow dags trigger teams-notify

잠시 후 task_id : success가 성공하고 task_id : failure가 실패한 결과를 볼 수 있습니다.

Teams에는 Logic App에서 정의한 내용으로 메시지가 Post되어 있는 것을 확인할 수 있다.

좋은 웹페이지 즐겨찾기