Apache Airflow를 사용하여 DAG 파일에 로그인

이 보도에 관하여


이 글은 다음 글의 후속 내용이다.
  • 공식 Docker 이미지로 Apache Airflow 시작
  • 이번에는 Apache Airflow에서 DAG 파일(Python 스크립트)을 인식해 실행합니다.

    이른바 DAG


    DAG는 Direct acyclic graph의 약칭으로 일본어로 바꾸면 유방향 무폐쇄 도표다.
    DAG는 지향도표이기 때문에 한 방향만 있고 정보는 앞으로 흐를 수 있으며 무폐합 곡선도이기 때문에 시작 노드로 돌아가는 상반된 길은 없다.
    유방향 그래프(Directed graphis)와 무방향 그래프(Underected graphis)의 관련은 여기에 포함되지 않습니다.
    DAG를 깊이 이해하려면 먼저 도표 데이터 구조 등을 배우는 것이 좋다.
    DAG의 장점은 다음과 같습니다.

  • 동적 프레임워크: 코드로 구성 가능

  • 확장: 다양한 유형의 작업 수행 지원(Python/Bash/SQL/Docker/AWS/Azure/GCP etc.)

  • 확장성: 무제한 작업 수행(작업 노드)
  • Airflow 및 DAG


    Airflow의 모든 임무는 DAG에서 정의해야 합니다.즉, 처리의 실행 순서를 DAG 형식으로 정의해야 합니다.
    DAG와 관련된 모든 구성은 Python 확장 기능인 DAG의 정의 파일로 정의됩니다.
    DAG의 정의 파일에는 장애 발생 시 메일 발송, 작업 시작, 종료 시간, 재시도 횟수 등 모든 의존 관계와 설정 파라미터가 포함되어 있습니다.또한 임무 의존 관계, 서열 등 모든 임무를 정의해야 한다.

    연산자


    DAG의 정의 파일에는 여러 개의 작업이 포함될 수 있지만 각 작업의 성격도 다를 수 있습니다.예를 들어 한 임무는 파이썬 스크립트이고, 다른 임무는 조개 스크립트, SQL, 스파크 작업 등이다.
    이러한 작업은 DAG 정의 파일 없이 Operator(연산자)를 사용하여 정의되며, Airflow는 다양한 유형의 작업에 다양한 연산자를 제공합니다.
    에어플로우가 다양한 임무를 수행할 수 있는 유연성을 갖기 때문에 다른 스케줄러에 비해 강점이 있다.

    스케줄러의 시동


    작업을 수행하려면 Airflow 스케줄러를 시작해야 합니다.
    저번 보도 환경을 구축할 때 스케줄러가 실행되지 않았기 때문에 이번에 이곳에서 스케줄러를 가동합니다.
    2021/01/06 보완 내용을 실시한 상황에서 스케줄러가 시작되었고 건너뛰어도 OK입니다.docker ps에서 컨테이너 ID를 확인하고 에어플로우의 컨테이너에 대한 베이스 세션을 시작한다.
    Bash 세션이 시작되면 CLI를 사용하여 스케줄러를 시작합니다.
    다음 명령을 실행하여 백그라운드에서 스케줄러를 시작합니다.
    airflow scheduler -D
    

    DAG 파일 생성


    그럼 DAG 파일 제작을 시작하겠습니다.DAG 파일은 파이썬 확장 기능처럼 파이썬 스크립트를 사용합니다.
    DAG는 작업과 관련된 모든 세부 내용을 포함하는 정의를 기술하고 의존 관계를 정의해야 한다.일반적으로 스크립트에서 DAG를 실행하는 데 필요한 단계는 다음과 같습니다.
  • 필요한 라이브러리 가져오기
  • 기본 매개 변수의 정의
  • DAG의 제작
  • 임무의 성명
  • 의존 관계에 대한 설명
  • 먼저 임의의 위치에 DAG 파일을 생성합니다.이번에는 sample_dag.py라는 파이톤 스크립트의 파일을 만들었다.
    이 파일에서 상술한 절차에 따라 코드를 기술한다.

    원하는 라이브러리 가져오기


    Airflow 실행에 필요한 모든 프로그램 라이브러리를 가져옵니다.
    일반적으로 datetime, 演算子(Python/Bash 등), 에어플로우 자체 등이다.
    sample_dag.py
    from datetime import timedelta
    import airflow
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    

    기본 매개변수 정의


    Airflow에서 작업을 수행하는 데 필요한 DAG의 기본 매개 변수를 정의합니다.
    sample_dag.py(계속)
    args = {
        'owner': 'Pramod', # DAG の所有者  
        'start_date': airflow.utils.dates.days_ago(3), # タスクの開始日時
        'depends_on_past': False,
        'email': ['[email protected]'], # 障害発生時などにメール送信を行う宛先
        'email_on_failure': False, # タスク失敗時にメールを送信するか否か
        'email_on_retry': False, # タスクのリトライが発生した際にメールを送信するか否か
        'retries': 1, # タスク失敗時のリトライ回数
        'retry_delay': timedelta(minutes=5), # タスクが失敗してからリトライが行われるまでの待ち時間
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2021, 12, 31), # タスクの終了日時
        # 'wait_for_downstream': False,
        # 'dag': dag,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    }
    

    DAG 생성


    기본 매개변수를 정의한 후 DAG 자체를 생성합니다.그러면 DAG 호스트의 이름과 설명, 작업을 수행할 간격이 정의됩니다.
    sample_dag.py(계속)
    dag = DAG(
        'sample_airflow_dag', # DAG の名前
        default_args=args, # DAG のデフォルト引数
        description='簡単な DAG テスト', # DAG の説明
        schedule_interval=timedelta(days=1), # タスクの実行間隔
        # start_date=airflow.utils.dates.days_ago(3), # ここでも指定可能
        tags=['example']
    )
    

    임무 선언


    DAG 자체를 작성한 후 실제로 수행된 작업(작업)을 선언합니다.
    이전 단계에서 작성한 DAG의 일부로 수행할 모든 작업을 선언합니다.
    sample_dag.py(계속)
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag,
    )
    
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
        dag=dag,
    )
    dag.doc_md = __doc__
    
    t1.doc_md = """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
    templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag,
    )
    
    상기 템플릿에서 작업의 일부분을 사용했다Jinja.
    Jinja는 Django의 템플릿을 모델로 하고 Python은 현대 디자이너의 우호적인 템플릿 언어를 사용합니다. ※공식 사이트 에서 발췌 하다

    종속 관계 기록


    마지막으로 작업의 실행 순서 (의존 관계) 를 정의합니다.작업은 병렬 작업과 순차 작업 중 임의로 정의됩니다.
    몇 가지 임무의 정의 방법이 있지만 여기는 간단하다t1 タスク完了後に t2 および t3 タスクを実行.
    sample_dag.py(계속)
    t1 >> [t2, t3]
    
    어떤 정의 방법이 있는지 참고하십시오여기..

    DAG 파일 구성


    DAG 파일을 만들었기 때문에 에어플로우 환경에 뒀다.
    DAG 파일 배치 위치의 디렉토리는 Airflow$AIRFLOW_HOME/airflow.cfg 파일에 설정되어 있습니다.

    airflow.cfg
    [core]
    # The folder where your airflow pipelines live, most likely a
    # subfolder in a code repository. This path must be absolute.
    dags_folder = /opt/airflow/dags
    
    기본값에 설정되어 있어야 합니다$AIRFLOW_HOME/dags.이 경로는 임의로 변경할 수 있다.$AIRFLOW_HOME 아래에 dags 폴더가 없으면 폴더를 만듭니다.dags_folder 지정된 디렉토리에 방금 작성한 DAG 파일sample_dag.py 파일을 구성합니다.

    파일을 구성하면 다음 에어플로우의 ScheduulerJob이 완료되면 에어플로우의 화면에서 DAG 정보를 확인할 수 있다.
    대체로 기본 설정이라면 프로필을 구성한 후 최장 5분 정도가 걸려야 화면에 나타난다.

    이렇게 하면 DAG 작업이 지정된 시간, 간격으로 수행됩니다.

    작업 수동 실행


    실제 작업이 정상적으로 끝났는지 여부는 즉석에서 수동으로 DAG 작업을 수행할 수도 있습니다.
    DAGs 화면에 나타나는 DAG 오른쪽의 Action에서 Trigger DAG 버튼을 선택합니다.

    DAG 트리거가 실행되면 Configuration JSON 화면이 표시됩니다.
    이번sample_dag.py의 경우 특별한 입력이 필요하지 않기 때문에 직접 선택Trigger한다.
    이렇게 하면 DAG는 수동으로 실행할 수 있다.

    DAGs 화면으로 돌아갈 때 Runs 표시줄의 표시가 runnning로 바뀌었는지 확인하십시오.
    DAG 작업이 실제로 수행되었는지 확인할 수 있습니다.

    총결산


    이번에는 아파치 에어플로우에서 DAG 작업을 수행하는 데 필요한 것DAG ファイルの作成DAG ファイルの配置에 대해 설명했다.
    아파치 에어플로우에서는 다양한 미션을 유연하게 수행할 수 있으니 꼭 시도해 보세요.

    참조 정보

  • Apache Airflow Documentation: Tutorial
  • 참고서


  • Learn PySpark: Build Python-based Machine Learning and Deep Learning Models - Singh, Pramod @2019
  • 좋은 웹페이지 즐겨찾기