[Airflow] 예제 실습

11678 단어 airflowairflow

개념 정리

1. Operator

2. Task

3. DAG

VirtualBOX에서 jupyter notebook 실행

jupyter notebook --ip= local ip주소

Example Pipeline definition

라이브러리 importing

%%writefile ./dags/tutorial.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

기본 매개변수 정의

작업 생성시 사용할 기본 매개변수를 Dictionary 타입으로 정의

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

DAG 객체

작업을 연결할 DAG 객체 생성

  • tutorial
    • 객체에 많은 dag 구별할 dag_id 필요 => dag_id 문자열 정의
  • default_args=default_args
    • argument dictionary 추가
  • schedule_interval=timedelta(days=1)
    • schedule_interval 정의 (dag 실행 간격..?)
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2), # => 이해 안감 
    tags=['example'],
) as dag:

Task 인스턴스 생성

작업 인스턴스 생성

  • 클래스로 생성한 객체 = 인스턴스
  • 작업(Tasks) = Operator의 인스턴스
  • 작업의 고유한 식별자 = task_id
    • 작업의 첫 번째 argument로 추가함
    • 작업은 반드시 task_id && owner argument를 포함하거나 상속받아야 함 !!!
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )

Airflow & Jinja Template

    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

job 하나 더 추가

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

의존관계 정의

  • 작업들간 의존관계 정의
    • [작업 메소드]
    • set_downstream & set_upstream
    • [연산자]
    • >> & <<
    t1 >> [t2, t3]

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

파이프라인 그리기

  1. DAG의 의존성 정보 확인
    airflow tasks list tutorial --tree
  2. 작업 인스턴스 테스트
    airflow tasks test 명령어
    • tasks 테스트
    • 인스턴스를 로컬에서 실행
    • 의존 관계 무관 동작
    • db에 상태를 전달 x
      airflow tasks test tutorial print_date 2022-01-01
      이렇게 나오면 성공!

  3. DAG 테스트
    airflow dags test [dag_id] [execution_date] 명령어
    • 의존 관계에 따라 작업 수행
    • db에 상태 전달 x
      airflow dags test tutorial 2022-01-01
      이렇게 나오면 성공
  4. Backfill
    • Backfill 이란?
      • 스케쥴러가 과거~현재 까지 실행되었어야 하는 dag들을 모두 실행하는 작업이 Backfill 이다.
    • 의존관계에 따라 작업 진행 + 파일에 로그를 남기며 db에 상태 정보를 저장한다
    • start_date와 end_date 명시해 작업 인스턴스가 스케줄에 맞춰 수행할 수 있도록 함
  # start your backfill on a date range
  airflow dags backfill tutorial \
      --start-date 2022-01-01 \
      --end-date 2022-01-07

이렇게 나오면 성공

  • finished run 7 of 7
  • succeeded 21
  • Backfill done Exiting.
[2022-01-29 18:42:11,902] {backfill_job.py:388} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-01-29 18:42:12,119] {local_executor.py:386} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2022-01-29 18:42:13,864] {backfill_job.py:831} INFO - Backfill done. Exiting.

참고 실습 : https://www.comtec.kr/2021/08/09/airflow-tutorial/
airflow 개념 참고 : https://jwon.org/airflow-tips/


실습 2

https://airflow.apache.org/docs/apache-airflow/2.1.4/tutorial_taskflow_api.html

좋은 웹페이지 즐겨찾기