첫 번째 공기 흐름 파이프를 작성하는 방법

이 강좌는 Apache Airflow 를 사용하여 첫 번째 작업 흐름을 만드는 데 필요한 기본 절차를 논의했다.
시작하기 전에 본고에서 논의한 모든 절차를 따를 수 있도록 기류 환경을 설정해야 합니다.만약 당신이 아직 이렇게 하지 않았다면, 우리는 이것을 찾을 것이다 article to be one of our personal favorites.

왜 기류야?


기류를 왜 사용하는지 물어볼 수도 있겠지?공기 흐름은 무미건조하고 불필요한 수동 작업을 자동화하고 관리함으로써 많은 문제를 해결하는 데 도움을 준다.
정의에 따르면 Apache Airflow는 프로그래밍 방식으로 워크플로우를 작성, 스케줄링, 모니터링하는 플랫폼으로 DAG라고도 부른다(참조Github.
Airflow를 사용하여 데이터베이스 백업 및 코드/구성 배포 스케줄링을 포함하여 ETL, 머신러닝 파이핑 및 일반 작업 스케줄링을 작성할 수 있습니다.
우리는 사용Airflow in our comparison of Airflow and Luigi의 몇 가지 장점을 토론했다.

공기 흐름 파이프 이해


기류 파이프는 본질적으로 파이썬(Python)으로 작성된 매개변수 그룹으로 기류 방향 비순환도(DAG) 객체를 정의합니다.작업 흐름 중의 각종 임무는 하나의 그림을 형성하는데, 이 그림은 방향이 정해져 있으며, 임무는 질서정연하기 때문이다.무한 순환에 빠지지 않도록 이 그림은 아무런 순환도 없기 때문에 비순환적이다.
예를 들어, Foo, Bar, FooBarFoo라는 세 가지 임무가 있는 경우 Bar 가 먼저 실행될 수 있으며, FooBarFooAIRFLOW_HOME 에 따라 수행됩니다.
그러면 다음과 같은 기본 도면이 생성됩니다.보시다시피 명확한 길이 있습니다.지금 상상해 보세요. 이것은 수백 개의 임무가 필요합니다.이러한 임무의 운행 방식과 순서에 대해 명확한 구조가 매우 중요하다.

이 기본적인 설명이 있으면 첫 번째 DAG를 만듭니다.
위의 링크에 따라 기류를 설정하면, owner 변수를 폴더로 가리키는 디렉터리가 설정되어 있어야 합니다.기본적으로 이것은 에어플로우라는 폴더일 것입니다.이 폴더에서 DAGs 폴더를 만들어야 합니다.다음과 같이 DAGs 폴더에 첫 번째 DAG를 만들려고 합니다.
airflow                  # airflow root directory.
├── dags                 # the dag root folder
│   ├── first_dag.py        # where you put your first task

기본 매개변수 설정


이 문제를 해결하려면 워크플로우의 모든 작업에 적용되는 모든 매개 변수를 포함하는Python 사전을 만들어야 합니다.다음 코드를 볼 때 기본 매개변수 중 일부는 (기본적으로 DAG 소유자의 이름일 뿐) 및 작업의 start_date (첫 번째 DAG 작업의 실행 날짜 확인) 입니다.
기류는 증량 운행과 역사 운행을 처리할 수 있다.때때로 너는 단지 업무 절차를 안배하고 싶지 않을 뿐, 단지 오늘의 임무를 실행하고 싶을 뿐이다.다음 첫 번째 코드 세그먼트에 설정된 작업을 과거 어느 날(예: 하루 전)부터 실행하기를 원할 수도 있습니다.

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

이 예에서 start_date 은 하루 전이다.당신의 첫 번째 DAG는 어제의 데이터를 실행한 다음 그 후의 어느 날에도 실행될 것입니다.
다음은 기타 관건적인 매개 변수들이다.
코드의
  • end_date 은 마지막 실행 날짜를 확정합니다.종료 날짜를 지정하면 기류가 이 날짜를 초과하는 것을 제한할 수 있습니다.만약 네가 이 종료 날짜를 바꾸지 않는다면, 기류는 영원히 운행할 것이다.
  • depends_on_past는 부울 값입니다.이를true로 설정하면 현재 실행 중인 테스트 실례는 이전 작업의 상태에 의존합니다.예를 들어, 이 매개 변수를true로 설정하면 이 예에서 매일 작업 흐름입니다.만약 어제의 작업이 실행에 실패했다면, 이틀 동안의 작업은 전날의 상태에 달려 있기 때문에 촉발되지 않을 것이다.
  • email가 바로 이메일 알림을 받은 곳입니다.프로필에 개인 이메일을 설정할 수 있습니다.
  • email on failure는 장애가 발생했을 때 알림을 받을지 여부를 정의하는 데 사용됩니다.
  • email on retry는 재시도할 때마다 이메일을 받을 것인지 여부를 정의하는 데 사용됩니다.
  • retries 기류 재시도 실패 작업 횟수
  • retry-delay는 연속 재시도 사이의 지속 시간입니다.
  • 이 예에서 기류는 5분마다 다시 시도될 것이다.
    고품질의 작업 흐름은 고장에 대해 경보/보고를 할 수 있어야 한다. 이것은 우리가 이 단계에서 실현해야 할 관건적인 목표 중의 하나이다.기류는 이 지역의 인코딩을 간소화하기 위해 특별히 설계되었다.이 경우 이메일로 장애 메시지를 보내는 것이 도움이 될 수 있습니다.

    DAG 계획 구성


    이 단계는 DAG의 이름을 지정하고 기본 매개 변수를 DAG에 전달하는 방법을 실례화하는 것입니다. default_args=default_args.
    그런 다음 DAG를 트리거하고 실행하는 빈도를 지정하도록 스케줄링 간격을 설정합니다.이런 상황에서 그것은 단지 매일 한 번일 뿐이다.
    다음은 DAG를 설정하는 방법입니다.
    dag = DAG(
    'basic_dag_1',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    )
    
    
    매일 계획을 실행하려면 다음 코드 매개 변수를 사용하십시오schedule_interval='@daily'.또는cron을 사용할 수 있습니다. 다음과 같습니다. schedule_interval='0 0 * * *'.

    모든 작업 나열


    다음 예제에서 우리는 세 가지 임무 사용PythonOperator, DummyOperator, BashOperator을 가지고 있다.
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    
    from datetime import datetime
    
    def my_func():
        print('Hello from my_func')
    
    
    bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
    )
    
    dummy_task  = DummyOperator(task_id='dummy_task', retries=3)
    
    python_task = PythonOperator(task_id='python_task', python_callable=my_func)
    
    
    이 임무들은 모두 매우 간단하다.각각의 함수가 다르기 때문에 서로 다른 매개 변수가 필요하다는 것을 알 수 있습니다.DummyOperator는 빈 조작부호일 뿐입니다. 파이프가 완성되었음을 표시하는 것 외에 실제로는 아무것도 하지 않는 단계를 만들 수 있습니다.PythonOperator 파이썬 함수를 호출할 수 있고 매개 변수를 전달할 수도 있습니다.BashOperator bash 명령을 호출할 수 있습니다.
    다음은 임무만 쓰겠습니다.이 작업은 모든 부품을 함께 추가해야 적용됩니다.
    이 기본 작업을 사용하면 의존 항목, 즉 작업 수행 순서를 정의할 수 있습니다.

    의존 항목 정의


    임무 간의 의존 관계를 정의할 수 있는 두 가지 방법이 있다.
    첫 번째 방법은 사용set_downstreamset_upstream이다.이 경우 set_upstream 를 사용하여 python_task BASH 작업에 의존하거나 다운스트림 버전에 대해 동일한 작업을 수행할 수 있습니다.
    # This means that will depend on bashtask
    # running successfully to run.
    bashtask.set_upstream(python_task)
    # similar to above where dummy_task will depend on bashtask
    dummy_task.set_downstream(bashtask)
    
    이 기본 설정을 사용하면 BASH 작업이 성공하면 Python 작업이 실행됩니다.이와 유사하게 dummy_task BASH 임무의 완성에 달려 있다.
    의존 관계를 정의하는 두 번째 방법은 위치 이동 연산자를 사용하는 것이다.비트 이동 연산자에 익숙하지 않은 사람에게는 >> 또는 <처럼 보입니다.
    예를 들어, BASH 작업에 의존하는 Python 작업을 참조하려면 bashtask >> python_task 로 작성할 수 있습니다.
    지금, 만약 당신이 몇 가지 임무를 하나에 의존한다면?
    그리고 너는 그것들을 하나의 목록으로 열거할 수 있다.이러한 상황에서 Python 임무와 dummy_task  모두 BASH 임무에 의존하고 BASH 임무가 완성된 후에 병행 수행한다.set_downstream 메서드나 비트 이동 연산자를 사용할 수 있습니다.bashtask.set_downstream([python_task, dummy_task])

    첫 번째 공기 흐름 파이프


    지금 우리는 이미 모든 다른 부분을 복습했으니, 우리는 그것들을 함께 놓을 수 있다.다음은 첫 번째 기본 기류 파이프입니다.
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    
    default_args = {
    'owner': 'default_user',
    'start_date': airflow.utils.dates.days_ago(1),
    'depends_on_past': True,
      #With this set to true, the pipeline won't run if the previous day failed
    'email': ['[email protected]'],
    'email_on_failure': True,
     #upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
    }
    
    dag = DAG(
    'basic_dag_1',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    )
    
    def my_func():
        print('Hello from my_func')
    
    
    bashtask = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
    )
    
    dummy_task  = DummyOperator(task_id='dummy_task', retries=3)
    
    python_task = PythonOperator(task_id='python_task', python_callable=my_func)
    
    dummy_task.set_downstream(bashtask)
    python_task.set_downstream(bashtask)
    
    

    DAG 공기 흐름 스케줄러 추가


    Airflow 데이터베이스를 초기화했다면 웹 서버를 사용하여 새 DAG에 추가할 수 있습니다.다음 명령을 사용하여 파이프에 추가할 수 있습니다.

    airflow webserver
    airflow scheduler


    최종 결과는 다음과 같이 공기 흐름 대시보드에 표시됩니다.

    좋은 웹페이지 즐겨찾기