Apache Airflow 작업 실패를 Teams에 알립니다.
소개
Apache Airflow는 태스크가 실패했을 경우에 호출할 수 있는
on_failure_callback
라고 하는 구조가 있다. 이를 사용하여 Microsoft Teams의 특정 채널에 경고를 POST하는 메커니즘을 만듭니다.Teams에 알림이 작동하는 방식
Teams에게 메시지를 POST하는 처리는 Logic App 로 작성해, 그것을 호출하도록 Airflow측에서 구현하는 것이 편리할 것 같다. 이번에는 Teams에 POST하는 것만의 처리로 했지만, 이것 이외의 통지 방법(메일 등)이나, 자동화 처리도 필요에 따라서 간단하게 추가할 수 있으므로, 이 부분을 Logic App에 맡긴다고 하는 것은 좋은 아이디어의 같아요.

설정
Logic App 설정
우선 Logic App을 설정한다. HTTP 요청을 받을 때 시작하도록 트리거를 설정하고 Teams에 메시지를 Post하는 간단한 워크플로를 정의합니다.

HTTP 요청 수신 설정
HTTP 요청을 수신할 때 Request Body로 수신할 JSON 스키마를 정의한다. 여기서는 Airflow로부터 건네줄 수 있는 Context의 일부의 값을 받는 것을 상정해 3개의 필드를 정의하고 있다.

정의 JSON Text는 다음과 같습니다.
{
"properties": {
"ExecDate": {
"type": "string"
},
"RunID": {
"type": "string"
},
"TaskInstance": {
"type": "string"
}
},
"type": "object"
}
Teams에 알림 설정
다음에 Teams의 통지의 설정을 한다. 편리하게도 Logic App에는 이미 Teams를 조작하기 위한 부품이 준비되어 있다. 그 중에서 이번은
Post a message (V3) (Preview)
를 선택.Team을 선택하고, Post처의 Channel을 선택해,Message란에 통지하고 싶은 내용을 기재한다. HTTP request body로 받는 항목을 사용하여 아래 그림과 같이 조립했다.

알림 테스트용 DAG 만들기
default_args
에 on_failure_callback
를 지정하면, 어떤 태스크라도 실패했을 경우에 통지할 수 있도록 할 수 있다. post_teams_channel
라는 함수를 작성해, on_failure_callback
가 발화했을 때에 호출할 수 있도록(듯이) 했다. 여기에서는 성공하는 태스크 success
와 실패하는 태스크 failure
를 모두 움직여 거동을 조사한다.import airflow
import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# Logic AppにHTTP POSTリクエストを送信する。ContextからDAGの情報をrequest bodyに渡す。
def post_teams_channel(context):
url = "https://<Logic App URL>"
headers = {'content-type': 'application/json'}
payload={
'TaskInstance': str(context['task_instance_key_str']),
'RunID': str(context['run_id']),
'ExecDate': str(context['execution_date'])
}
r = requests.post(url, headers=headers, json=payload)
# on_failure_callbackをここに実装
args = {
"owner": "airflow",
"email": ["[email protected]"],
"depends_on_past": False,
"on_failure_callback": post_teams_channel,
"start_date": airflow.utils.dates.days_ago(0)
}
# DAGの作成
dag = DAG(dag_id="teams-notify",
default_args=args,
schedule_interval="@daily")
# 成功するタスク "success"
t1 = BashOperator(
task_id='success',
bash_command='exit 0',
dag=dag,
)
# 失敗するタスク "failure"
t2 = BashOperator(
task_id='failure',
bash_command='exit 1',
dag=dag,
)
# タスクの依存関係定義
t1 >> t2
알림 테스트
준비가 완료되었으므로 실제로 DAG를 트리거하여 동작을 테스트해 본다. 다음 명령을 실행하여 즉시 트리거합니다.
airflow dags trigger teams-notify
잠시 후
task_id : success
가 성공하고 task_id : failure
가 실패한 결과를 볼 수 있습니다.
Teams에는 Logic App에서 정의한 내용으로 메시지가 Post되어 있는 것을 확인할 수 있다.

Reference
이 문제에 관하여(Apache Airflow 작업 실패를 Teams에 알립니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/whata/items/dba1c2db1a38d9d4a4c0텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)