Apache Airflow를 사용하여 DAG 파일에 로그인
이 보도에 관하여
이 글은 다음 글의 후속 내용이다.
이른바 DAG
DAG는
Direct acyclic graph
의 약칭으로 일본어로 바꾸면 유방향 무폐쇄 도표다.DAG는 지향도표이기 때문에 한 방향만 있고 정보는 앞으로 흐를 수 있으며 무폐합 곡선도이기 때문에 시작 노드로 돌아가는 상반된 길은 없다.
유방향 그래프(Directed graphis)와 무방향 그래프(Underected graphis)의 관련은 여기에 포함되지 않습니다.
DAG를 깊이 이해하려면 먼저 도표 데이터 구조 등을 배우는 것이 좋다.
DAG의 장점은 다음과 같습니다.
동적 프레임워크: 코드로 구성 가능
확장: 다양한 유형의 작업 수행 지원(Python/Bash/SQL/Docker/AWS/Azure/GCP etc.)
확장성: 무제한 작업 수행(작업 노드)
Airflow 및 DAG
Airflow의 모든 임무는 DAG에서 정의해야 합니다.즉, 처리의 실행 순서를 DAG 형식으로 정의해야 합니다.
DAG와 관련된 모든 구성은 Python 확장 기능인 DAG의 정의 파일로 정의됩니다.
DAG의 정의 파일에는 장애 발생 시 메일 발송, 작업 시작, 종료 시간, 재시도 횟수 등 모든 의존 관계와 설정 파라미터가 포함되어 있습니다.또한 임무 의존 관계, 서열 등 모든 임무를 정의해야 한다.
연산자
DAG의 정의 파일에는 여러 개의 작업이 포함될 수 있지만 각 작업의 성격도 다를 수 있습니다.예를 들어 한 임무는 파이썬 스크립트이고, 다른 임무는 조개 스크립트, SQL, 스파크 작업 등이다.
이러한 작업은 DAG 정의 파일 없이 Operator(연산자)를 사용하여 정의되며, Airflow는 다양한 유형의 작업에 다양한 연산자를 제공합니다.
에어플로우가 다양한 임무를 수행할 수 있는 유연성을 갖기 때문에 다른 스케줄러에 비해 강점이 있다.
스케줄러의 시동
작업을 수행하려면 Airflow 스케줄러를 시작해야 합니다.
저번 보도 환경을 구축할 때 스케줄러가 실행되지 않았기 때문에 이번에 이곳에서 스케줄러를 가동합니다.
2021/01/06 보완 내용을 실시한 상황에서 스케줄러가 시작되었고 건너뛰어도 OK입니다.
docker ps
에서 컨테이너 ID를 확인하고 에어플로우의 컨테이너에 대한 베이스 세션을 시작한다.Bash 세션이 시작되면 CLI를 사용하여 스케줄러를 시작합니다.
다음 명령을 실행하여 백그라운드에서 스케줄러를 시작합니다.
airflow scheduler -D
DAG 파일 생성
그럼 DAG 파일 제작을 시작하겠습니다.DAG 파일은 파이썬 확장 기능처럼 파이썬 스크립트를 사용합니다.
DAG는 작업과 관련된 모든 세부 내용을 포함하는 정의를 기술하고 의존 관계를 정의해야 한다.일반적으로 스크립트에서 DAG를 실행하는 데 필요한 단계는 다음과 같습니다.
sample_dag.py
라는 파이톤 스크립트의 파일을 만들었다.이 파일에서 상술한 절차에 따라 코드를 기술한다.
원하는 라이브러리 가져오기
Airflow 실행에 필요한 모든 프로그램 라이브러리를 가져옵니다.
일반적으로
datetime
, 演算子
(Python/Bash 등), 에어플로우 자체 등이다.sample_dag.py
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
기본 매개변수 정의
Airflow에서 작업을 수행하는 데 필요한 DAG의 기본 매개 변수를 정의합니다.
sample_dag.py(계속)
args = {
'owner': 'Pramod', # DAG の所有者
'start_date': airflow.utils.dates.days_ago(3), # タスクの開始日時
'depends_on_past': False,
'email': ['[email protected]'], # 障害発生時などにメール送信を行う宛先
'email_on_failure': False, # タスク失敗時にメールを送信するか否か
'email_on_retry': False, # タスクのリトライが発生した際にメールを送信するか否か
'retries': 1, # タスク失敗時のリトライ回数
'retry_delay': timedelta(minutes=5), # タスクが失敗してからリトライが行われるまでの待ち時間
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2021, 12, 31), # タスクの終了日時
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
DAG 생성
기본 매개변수를 정의한 후 DAG 자체를 생성합니다.그러면 DAG 호스트의 이름과 설명, 작업을 수행할 간격이 정의됩니다.
sample_dag.py(계속)
dag = DAG(
'sample_airflow_dag', # DAG の名前
default_args=args, # DAG のデフォルト引数
description='簡単な DAG テスト', # DAG の説明
schedule_interval=timedelta(days=1), # タスクの実行間隔
# start_date=airflow.utils.dates.days_ago(3), # ここでも指定可能
tags=['example']
)
임무 선언
DAG 자체를 작성한 후 실제로 수행된 작업(작업)을 선언합니다.
이전 단계에서 작성한 DAG의 일부로 수행할 모든 작업을 선언합니다.
sample_dag.py(계속)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
상기 템플릿에서 작업의 일부분을 사용했다Jinja.Jinja는 Django의 템플릿을 모델로 하고 Python은 현대 디자이너의 우호적인 템플릿 언어를 사용합니다. ※공식 사이트 에서 발췌 하다
종속 관계 기록
마지막으로 작업의 실행 순서 (의존 관계) 를 정의합니다.작업은 병렬 작업과 순차 작업 중 임의로 정의됩니다.
몇 가지 임무의 정의 방법이 있지만 여기는 간단하다
t1 タスク完了後に t2 および t3 タスクを実行
.sample_dag.py(계속)
t1 >> [t2, t3]
어떤 정의 방법이 있는지 참고하십시오여기..DAG 파일 구성
DAG 파일을 만들었기 때문에 에어플로우 환경에 뒀다.
DAG 파일 배치 위치의 디렉토리는 Airflow
$AIRFLOW_HOME/airflow.cfg
파일에 설정되어 있습니다.airflow.cfg
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags
기본값에 설정되어 있어야 합니다$AIRFLOW_HOME/dags
.이 경로는 임의로 변경할 수 있다.$AIRFLOW_HOME
아래에 dags
폴더가 없으면 폴더를 만듭니다.dags_folder
지정된 디렉토리에 방금 작성한 DAG 파일sample_dag.py
파일을 구성합니다.파일을 구성하면 다음 에어플로우의 ScheduulerJob이 완료되면 에어플로우의 화면에서 DAG 정보를 확인할 수 있다.
대체로 기본 설정이라면 프로필을 구성한 후 최장 5분 정도가 걸려야 화면에 나타난다.
이렇게 하면 DAG 작업이 지정된 시간, 간격으로 수행됩니다.
작업 수동 실행
실제 작업이 정상적으로 끝났는지 여부는 즉석에서 수동으로 DAG 작업을 수행할 수도 있습니다.
DAGs 화면에 나타나는 DAG 오른쪽의
Action
에서 Trigger DAG
버튼을 선택합니다.DAG 트리거가 실행되면 Configuration JSON 화면이 표시됩니다.
이번
sample_dag.py
의 경우 특별한 입력이 필요하지 않기 때문에 직접 선택Trigger
한다.이렇게 하면 DAG는 수동으로 실행할 수 있다.
DAGs 화면으로 돌아갈 때
Runs
표시줄의 표시가 runnning
로 바뀌었는지 확인하십시오.DAG 작업이 실제로 수행되었는지 확인할 수 있습니다.
총결산
이번에는 아파치 에어플로우에서 DAG 작업을 수행하는 데 필요한 것
DAG ファイルの作成
과 DAG ファイルの配置
에 대해 설명했다.아파치 에어플로우에서는 다양한 미션을 유연하게 수행할 수 있으니 꼭 시도해 보세요.
참조 정보
참고서
Learn PySpark: Build Python-based Machine Learning and Deep Learning Models - Singh, Pramod @2019
Reference
이 문제에 관하여(Apache Airflow를 사용하여 DAG 파일에 로그인), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/ymasaoka/articles/register-dag-with-apache-airflow텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)