관리 워크플로우 이메일 알림

12644 단어 AWSairflow

e-메일 알림


기본 워크플로우 이해
https://qiita.com/pioho07/items/97ebd1351916177d50f3
여기에 오류가 발생했을 때의 메일 알림과 성공했을 때의 메일 알림 처리를 추가합니다.

이번에는 SMTP 서버에서 SES를 사용합니다.


SES의 SMTP 설정


다음 화면에서 STMP 설정 을 클릭한 다음 내 SMTP Credentials 만들기 를 클릭합니다.
자격 증명 메모 (사용자/암호)
ServerName (email-smtp.ap-northeast-1.amazonaws.com)

SES에서 메일 주소 확인


SES는 검증된 메일 주소만 보낼 수 있기 때문에 이번에 사용한 메일 주소를 검증합니다.
Email Addresses를 클릭한 다음 Cerify a New Email Address를 클릭합니다.임의의 메일을 넣고 보낸 메일의 본문 링크를 누르면 검증이 완료됩니다.

WAA 설정 값

  • core.default_ui_timezone: Asia/Tokyo
  • email.email_backend: airflow.utils.email.send_email_smtp
  • scheduler.catchup_by_default: False
  • smtp.smtp_host: email-smtp.ap-northeast-1.amazonaws.com(※ 기록된 SES의 SMTP 끝점)
  • smtp.smtp_mail_from: [email protected]
  • smtp.smtp_password:xxxxxxxxxxxxxxxxxxx(※ SMTP 암호 기록)
  • smtp.smtp_port: 587
  • smtp.smtp_ssl: False
  • smtp.smtp_starttls: True
  • smtp.smtp_user:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
  • 이런 느낌. 에어플로우.cfg의 부분 (core나 smtp 등) 을 머리에 끼고 점을 붙여서 옵션을 설정합니다.공식 문서와는 좀 달라요.smtp.smtp_user 같은 건 문서에 실리지 않았지만 있는 것 같아요.

    Airflow DAG


    주요 변경점은
    * email_on_실패 메일을 failer로 보내기
    * EmailOperator를 통한 성공 메시지 발송
    *retries:1 재시도
    *raise를 통해 실패 테스트를 진행합니다(이하 DAG의 raise 부분에 대한 설명 제거).
    import boto3
    import time
    import airflow
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.email_operator import EmailOperator
    
    args = {
        "owner": "airflow",
        "start_date": airflow.utils.dates.days_ago(1),
        "provide_context": False,
        "email_on_failure": True,
        "email_on_retry": True,
        "retries":1,
        "retry_delay":120,
        "email": "[email protected]"
    }
    
    etl_job_name = "se2_job1"
    etl_crwl_name = "se2_out1"
    
    glue_client = boto3.client('glue')
    
    def glue_etl_job():
    ## fail test
    ##    raise Exception('エラーテスト')
        job = glue_client.start_job_run(JobName=etl_job_name)
        while True:
            status = glue_client.get_job_run(JobName=etl_job_name, RunId=job['JobRunId'])
            if status['JobRun']['JobRunState'] == 'SUCCEEDED':
                break
            time.sleep(60)
    
    def glue_crawler_job():
        glue_client.start_crawler(Name=etl_crwl_name)
        time.sleep(10)
        while True:
            status = glue_client.get_crawler(Name=etl_crwl_name)
            if status['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
                break
            time.sleep(60)
    
    with DAG(
        dag_id="demo_etl_job",
        description="Simple glue DAG",
        default_args=args,
        schedule_interval="*/60 * * * *",
    #    catchup=False,
        tags=['demo']
    ) as dag:
        t1 = PythonOperator(task_id="glue_job_step", python_callable=glue_etl_job)
        t2 = PythonOperator(task_id="glue_crawler_step", python_callable=glue_crawler_job)
        t3 = EmailOperator(
        mime_charset='utf-8',
        task_id='success_email_step',
        to='[email protected]',
        subject='Airflow processing report',
        html_content='おめでとう。成功です!'
        )
    
        t1 >> t2>> t3
    

    흐름도



    테스트


    처리 완료 메일 알림



    축하합니다.성공했어!

    첫 번째 단계 실패, 오류 메일 알림



    공기 흐름 로그 오류 출력
    1 런타임 로그

    실행 중인 로그 다시 시도하기
    재시도까지의 간격도 설정과 마찬가지로 120초 정도입니다.

    잘못된 부분

    받은 오류 메일
    처음
    Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [up_for_retry]>
    
    Try 1 out of 2
    Exception:
    Failed attempt to attach error logs
    Log: Link
    Host: ip-10-1-3-72.ap-northeast-1.compute.internal
    Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
    Mark success: Link
    
    두 번째
    Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [failed]>
    
    Try 2 out of 2
    Exception:
    Failed attempt to attach error logs
    Log: Link
    Host: ip-10-1-3-72.ap-northeast-1.compute.internal
    Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
    Mark success: Link
    
    다시 시도하기 1이라 메일을 두 번 보냈어요.0을 다시 시도하려면 메일도 한 번밖에 없습니다.

    참고 자료


    AWS MWAA 공식
    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html
    github
    https://github.com/apache/airflow

    좋은 웹페이지 즐겨찾기