[Airflow] 예제 실습
개념 정리
1. Operator
2. Task
3. DAG
VirtualBOX에서 jupyter notebook 실행
jupyter notebook --ip= local ip주소
Example Pipeline definition
- basic pipeline definition example
- airflow tutorial.py : https://airflow.apache.org/docs/apache-airflow/2.1.4/tutorial.html
- 이 분 블로그 보면서 이해 + 정리
라이브러리 importing
%%writefile ./dags/tutorial.py
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
기본 매개변수 정의
작업 생성시 사용할 기본 매개변수를 Dictionary 타입으로 정의
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
DAG 객체
작업을 연결할 DAG 객체 생성
- tutorial
- 객체에 많은 dag 구별할 dag_id 필요 => dag_id 문자열 정의
- default_args=default_args
- argument dictionary 추가
- schedule_interval=timedelta(days=1)
- schedule_interval 정의 (dag 실행 간격..?)
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2), # => 이해 안감
tags=['example'],
) as dag:
Task 인스턴스 생성
작업 인스턴스 생성
- 클래스로 생성한 객체 = 인스턴스
- 작업(Tasks) = Operator의 인스턴스
- 작업의 고유한 식별자 = task_id
- 작업의 첫 번째 argument로 추가함
- 작업은 반드시 task_id && owner argument를 포함하거나 상속받아야 함 !!!
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
Airflow & Jinja Template
- Airflow Jinja Templating, parameter, macro 등 지원함
- Airflow에서 Jinja tempaltes 사용하기 예제 : https://dydwnsekd.tistory.com/62
- airflow에서 제공하는 marco 목록 : https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
- 아래 코드 : templated_command 예시
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
job 하나 더 추가
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
의존관계 정의
- 작업들간 의존관계 정의
- [작업 메소드]
set_downstream & set_upstream
- [연산자]
>> & <<
t1 >> [t2, t3]
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
파이프라인 그리기
- DAG의 의존성 정보 확인
airflow tasks list tutorial --tree
- 작업 인스턴스 테스트
airflow tasks test
명령어- tasks 테스트
- 인스턴스를 로컬에서 실행
- 의존 관계 무관 동작
- db에 상태를 전달 x
airflow tasks test tutorial print_date 2022-01-01
이렇게 나오면 성공!
- DAG 테스트
airflow dags test [dag_id] [execution_date]
명령어- 의존 관계에 따라 작업 수행
- db에 상태 전달 x
airflow dags test tutorial 2022-01-01
이렇게 나오면 성공
- Backfill
Backfill 이란?
- 스케쥴러가 과거~현재 까지 실행되었어야 하는 dag들을 모두 실행하는 작업이
Backfill
이다.
- 스케쥴러가 과거~현재 까지 실행되었어야 하는 dag들을 모두 실행하는 작업이
- 의존관계에 따라 작업 진행 + 파일에 로그를 남기며 db에 상태 정보를 저장한다
- start_date와 end_date 명시해 작업 인스턴스가 스케줄에 맞춰 수행할 수 있도록 함
# start your backfill on a date range
airflow dags backfill tutorial \
--start-date 2022-01-01 \
--end-date 2022-01-07
이렇게 나오면 성공
- finished run 7 of 7
- succeeded 21
- Backfill done Exiting.
[2022-01-29 18:42:11,902] {backfill_job.py:388} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-01-29 18:42:12,119] {local_executor.py:386} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2022-01-29 18:42:13,864] {backfill_job.py:831} INFO - Backfill done. Exiting.
참고 실습 : https://www.comtec.kr/2021/08/09/airflow-tutorial/
airflow 개념 참고 : https://jwon.org/airflow-tips/
실습 2
https://airflow.apache.org/docs/apache-airflow/2.1.4/tutorial_taskflow_api.html
Author And Source
이 문제에 관하여([Airflow] 예제 실습), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@yje876/Airflow-예제-실습저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)