첫 번째 공기 흐름 파이프를 작성하는 방법
시작하기 전에 본고에서 논의한 모든 절차를 따를 수 있도록 기류 환경을 설정해야 합니다.만약 당신이 아직 이렇게 하지 않았다면, 우리는 이것을 찾을 것이다 article to be one of our personal favorites.
왜 기류야?
기류를 왜 사용하는지 물어볼 수도 있겠지?공기 흐름은 무미건조하고 불필요한 수동 작업을 자동화하고 관리함으로써 많은 문제를 해결하는 데 도움을 준다.
정의에 따르면 Apache Airflow는 프로그래밍 방식으로 워크플로우를 작성, 스케줄링, 모니터링하는 플랫폼으로 DAG라고도 부른다(참조Github.
Airflow를 사용하여 데이터베이스 백업 및 코드/구성 배포 스케줄링을 포함하여 ETL, 머신러닝 파이핑 및 일반 작업 스케줄링을 작성할 수 있습니다.
우리는 사용Airflow in our comparison of Airflow and Luigi의 몇 가지 장점을 토론했다.
공기 흐름 파이프 이해
기류 파이프는 본질적으로 파이썬(Python)으로 작성된 매개변수 그룹으로 기류 방향 비순환도(DAG) 객체를 정의합니다.작업 흐름 중의 각종 임무는 하나의 그림을 형성하는데, 이 그림은 방향이 정해져 있으며, 임무는 질서정연하기 때문이다.무한 순환에 빠지지 않도록 이 그림은 아무런 순환도 없기 때문에 비순환적이다.
예를 들어, Foo
, Bar
, FooBar
및 Foo
라는 세 가지 임무가 있는 경우 Bar
가 먼저 실행될 수 있으며, FooBar
및 Foo
은 AIRFLOW_HOME
에 따라 수행됩니다.
그러면 다음과 같은 기본 도면이 생성됩니다.보시다시피 명확한 길이 있습니다.지금 상상해 보세요. 이것은 수백 개의 임무가 필요합니다.이러한 임무의 운행 방식과 순서에 대해 명확한 구조가 매우 중요하다.
이 기본적인 설명이 있으면 첫 번째 DAG를 만듭니다.
위의 링크에 따라 기류를 설정하면, owner
변수를 폴더로 가리키는 디렉터리가 설정되어 있어야 합니다.기본적으로 이것은 에어플로우라는 폴더일 것입니다.이 폴더에서 DAGs 폴더를 만들어야 합니다.다음과 같이 DAGs 폴더에 첫 번째 DAG를 만들려고 합니다.
airflow # airflow root directory.
├── dags # the dag root folder
│ ├── first_dag.py # where you put your first task
기본 매개변수 설정
이 문제를 해결하려면 워크플로우의 모든 작업에 적용되는 모든 매개 변수를 포함하는Python 사전을 만들어야 합니다.다음 코드를 볼 때 기본 매개변수 중 일부는 (기본적으로 DAG 소유자의 이름일 뿐) 및 작업의 start_date
(첫 번째 DAG 작업의 실행 날짜 확인) 입니다.
기류는 증량 운행과 역사 운행을 처리할 수 있다.때때로 너는 단지 업무 절차를 안배하고 싶지 않을 뿐, 단지 오늘의 임무를 실행하고 싶을 뿐이다.다음 첫 번째 코드 세그먼트에 설정된 작업을 과거 어느 날(예: 하루 전)부터 실행하기를 원할 수도 있습니다.
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
이 예에서 start_date
은 하루 전이다.당신의 첫 번째 DAG는 어제의 데이터를 실행한 다음 그 후의 어느 날에도 실행될 것입니다.
다음은 기타 관건적인 매개 변수들이다.
코드의
기류 파이프는 본질적으로 파이썬(Python)으로 작성된 매개변수 그룹으로 기류 방향 비순환도(DAG) 객체를 정의합니다.작업 흐름 중의 각종 임무는 하나의 그림을 형성하는데, 이 그림은 방향이 정해져 있으며, 임무는 질서정연하기 때문이다.무한 순환에 빠지지 않도록 이 그림은 아무런 순환도 없기 때문에 비순환적이다.
예를 들어,
Foo
, Bar
, FooBar
및 Foo
라는 세 가지 임무가 있는 경우 Bar
가 먼저 실행될 수 있으며, FooBar
및 Foo
은 AIRFLOW_HOME
에 따라 수행됩니다.그러면 다음과 같은 기본 도면이 생성됩니다.보시다시피 명확한 길이 있습니다.지금 상상해 보세요. 이것은 수백 개의 임무가 필요합니다.이러한 임무의 운행 방식과 순서에 대해 명확한 구조가 매우 중요하다.
이 기본적인 설명이 있으면 첫 번째 DAG를 만듭니다.
위의 링크에 따라 기류를 설정하면,
owner
변수를 폴더로 가리키는 디렉터리가 설정되어 있어야 합니다.기본적으로 이것은 에어플로우라는 폴더일 것입니다.이 폴더에서 DAGs 폴더를 만들어야 합니다.다음과 같이 DAGs 폴더에 첫 번째 DAG를 만들려고 합니다.airflow # airflow root directory.
├── dags # the dag root folder
│ ├── first_dag.py # where you put your first task
기본 매개변수 설정
이 문제를 해결하려면 워크플로우의 모든 작업에 적용되는 모든 매개 변수를 포함하는Python 사전을 만들어야 합니다.다음 코드를 볼 때 기본 매개변수 중 일부는 (기본적으로 DAG 소유자의 이름일 뿐) 및 작업의 start_date
(첫 번째 DAG 작업의 실행 날짜 확인) 입니다.
기류는 증량 운행과 역사 운행을 처리할 수 있다.때때로 너는 단지 업무 절차를 안배하고 싶지 않을 뿐, 단지 오늘의 임무를 실행하고 싶을 뿐이다.다음 첫 번째 코드 세그먼트에 설정된 작업을 과거 어느 날(예: 하루 전)부터 실행하기를 원할 수도 있습니다.
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
이 예에서 start_date
은 하루 전이다.당신의 첫 번째 DAG는 어제의 데이터를 실행한 다음 그 후의 어느 날에도 실행될 것입니다.
다음은 기타 관건적인 매개 변수들이다.
코드의
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
end_date
은 마지막 실행 날짜를 확정합니다.종료 날짜를 지정하면 기류가 이 날짜를 초과하는 것을 제한할 수 있습니다.만약 네가 이 종료 날짜를 바꾸지 않는다면, 기류는 영원히 운행할 것이다.depends_on_past
는 부울 값입니다.이를true로 설정하면 현재 실행 중인 테스트 실례는 이전 작업의 상태에 의존합니다.예를 들어, 이 매개 변수를true로 설정하면 이 예에서 매일 작업 흐름입니다.만약 어제의 작업이 실행에 실패했다면, 이틀 동안의 작업은 전날의 상태에 달려 있기 때문에 촉발되지 않을 것이다.email
가 바로 이메일 알림을 받은 곳입니다.프로필에 개인 이메일을 설정할 수 있습니다.email on failure
는 장애가 발생했을 때 알림을 받을지 여부를 정의하는 데 사용됩니다.email on retry
는 재시도할 때마다 이메일을 받을 것인지 여부를 정의하는 데 사용됩니다.retries
기류 재시도 실패 작업 횟수retry-delay
는 연속 재시도 사이의 지속 시간입니다.고품질의 작업 흐름은 고장에 대해 경보/보고를 할 수 있어야 한다. 이것은 우리가 이 단계에서 실현해야 할 관건적인 목표 중의 하나이다.기류는 이 지역의 인코딩을 간소화하기 위해 특별히 설계되었다.이 경우 이메일로 장애 메시지를 보내는 것이 도움이 될 수 있습니다.
DAG 계획 구성
이 단계는 DAG의 이름을 지정하고 기본 매개 변수를 DAG에 전달하는 방법을 실례화하는 것입니다. default_args=default_args
.
그런 다음 DAG를 트리거하고 실행하는 빈도를 지정하도록 스케줄링 간격을 설정합니다.이런 상황에서 그것은 단지 매일 한 번일 뿐이다.
다음은 DAG를 설정하는 방법입니다.
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
매일 계획을 실행하려면 다음 코드 매개 변수를 사용하십시오schedule_interval='@daily'
.또는cron을 사용할 수 있습니다. 다음과 같습니다. schedule_interval='0 0 * * *'
.
모든 작업 나열
다음 예제에서 우리는 세 가지 임무 사용PythonOperator
, DummyOperator
, BashOperator
을 가지고 있다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
이 임무들은 모두 매우 간단하다.각각의 함수가 다르기 때문에 서로 다른 매개 변수가 필요하다는 것을 알 수 있습니다.DummyOperator
는 빈 조작부호일 뿐입니다. 파이프가 완성되었음을 표시하는 것 외에 실제로는 아무것도 하지 않는 단계를 만들 수 있습니다.PythonOperator
파이썬 함수를 호출할 수 있고 매개 변수를 전달할 수도 있습니다.BashOperator
bash 명령을 호출할 수 있습니다.
다음은 임무만 쓰겠습니다.이 작업은 모든 부품을 함께 추가해야 적용됩니다.
이 기본 작업을 사용하면 의존 항목, 즉 작업 수행 순서를 정의할 수 있습니다.
의존 항목 정의
임무 간의 의존 관계를 정의할 수 있는 두 가지 방법이 있다.
첫 번째 방법은 사용set_downstream
과set_upstream
이다.이 경우 set_upstream
를 사용하여 python_task
BASH 작업에 의존하거나 다운스트림 버전에 대해 동일한 작업을 수행할 수 있습니다.
# This means that will depend on bashtask
# running successfully to run.
bashtask.set_upstream(python_task)
# similar to above where dummy_task will depend on bashtask
dummy_task.set_downstream(bashtask)
이 기본 설정을 사용하면 BASH 작업이 성공하면 Python 작업이 실행됩니다.이와 유사하게 dummy_task
BASH 임무의 완성에 달려 있다.
의존 관계를 정의하는 두 번째 방법은 위치 이동 연산자를 사용하는 것이다.비트 이동 연산자에 익숙하지 않은 사람에게는 >> 또는 <처럼 보입니다.
예를 들어, BASH 작업에 의존하는 Python 작업을 참조하려면 bashtask >> python_task
로 작성할 수 있습니다.
지금, 만약 당신이 몇 가지 임무를 하나에 의존한다면?
그리고 너는 그것들을 하나의 목록으로 열거할 수 있다.이러한 상황에서 Python 임무와 dummy_task
모두 BASH 임무에 의존하고 BASH 임무가 완성된 후에 병행 수행한다.set_downstream
메서드나 비트 이동 연산자를 사용할 수 있습니다.bashtask.set_downstream([python_task, dummy_task])
첫 번째 공기 흐름 파이프
지금 우리는 이미 모든 다른 부분을 복습했으니, 우리는 그것들을 함께 놓을 수 있다.다음은 첫 번째 기본 기류 파이프입니다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)
DAG 공기 흐름 스케줄러 추가
Airflow 데이터베이스를 초기화했다면 웹 서버를 사용하여 새 DAG에 추가할 수 있습니다.다음 명령을 사용하여 파이프에 추가할 수 있습니다.
airflow webserver
airflow scheduler
최종 결과는 다음과 같이 공기 흐름 대시보드에 표시됩니다.
Reference
이 문제에 관하여(첫 번째 공기 흐름 파이프를 작성하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/seattledataguy/how-to-write-your-first-airflow-pipeline-2j43
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
다음 예제에서 우리는 세 가지 임무 사용
PythonOperator
, DummyOperator
, BashOperator
을 가지고 있다.from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
이 임무들은 모두 매우 간단하다.각각의 함수가 다르기 때문에 서로 다른 매개 변수가 필요하다는 것을 알 수 있습니다.DummyOperator
는 빈 조작부호일 뿐입니다. 파이프가 완성되었음을 표시하는 것 외에 실제로는 아무것도 하지 않는 단계를 만들 수 있습니다.PythonOperator
파이썬 함수를 호출할 수 있고 매개 변수를 전달할 수도 있습니다.BashOperator
bash 명령을 호출할 수 있습니다.다음은 임무만 쓰겠습니다.이 작업은 모든 부품을 함께 추가해야 적용됩니다.
이 기본 작업을 사용하면 의존 항목, 즉 작업 수행 순서를 정의할 수 있습니다.
의존 항목 정의
임무 간의 의존 관계를 정의할 수 있는 두 가지 방법이 있다.
첫 번째 방법은 사용set_downstream
과set_upstream
이다.이 경우 set_upstream
를 사용하여 python_task
BASH 작업에 의존하거나 다운스트림 버전에 대해 동일한 작업을 수행할 수 있습니다.
# This means that will depend on bashtask
# running successfully to run.
bashtask.set_upstream(python_task)
# similar to above where dummy_task will depend on bashtask
dummy_task.set_downstream(bashtask)
이 기본 설정을 사용하면 BASH 작업이 성공하면 Python 작업이 실행됩니다.이와 유사하게 dummy_task
BASH 임무의 완성에 달려 있다.
의존 관계를 정의하는 두 번째 방법은 위치 이동 연산자를 사용하는 것이다.비트 이동 연산자에 익숙하지 않은 사람에게는 >> 또는 <처럼 보입니다.
예를 들어, BASH 작업에 의존하는 Python 작업을 참조하려면 bashtask >> python_task
로 작성할 수 있습니다.
지금, 만약 당신이 몇 가지 임무를 하나에 의존한다면?
그리고 너는 그것들을 하나의 목록으로 열거할 수 있다.이러한 상황에서 Python 임무와 dummy_task
모두 BASH 임무에 의존하고 BASH 임무가 완성된 후에 병행 수행한다.set_downstream
메서드나 비트 이동 연산자를 사용할 수 있습니다.bashtask.set_downstream([python_task, dummy_task])
첫 번째 공기 흐름 파이프
지금 우리는 이미 모든 다른 부분을 복습했으니, 우리는 그것들을 함께 놓을 수 있다.다음은 첫 번째 기본 기류 파이프입니다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)
DAG 공기 흐름 스케줄러 추가
Airflow 데이터베이스를 초기화했다면 웹 서버를 사용하여 새 DAG에 추가할 수 있습니다.다음 명령을 사용하여 파이프에 추가할 수 있습니다.
airflow webserver
airflow scheduler
최종 결과는 다음과 같이 공기 흐름 대시보드에 표시됩니다.
Reference
이 문제에 관하여(첫 번째 공기 흐름 파이프를 작성하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/seattledataguy/how-to-write-your-first-airflow-pipeline-2j43
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
# This means that will depend on bashtask
# running successfully to run.
bashtask.set_upstream(python_task)
# similar to above where dummy_task will depend on bashtask
dummy_task.set_downstream(bashtask)
지금 우리는 이미 모든 다른 부분을 복습했으니, 우리는 그것들을 함께 놓을 수 있다.다음은 첫 번째 기본 기류 파이프입니다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': ['[email protected]'],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def my_func():
print('Hello from my_func')
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)
DAG 공기 흐름 스케줄러 추가
Airflow 데이터베이스를 초기화했다면 웹 서버를 사용하여 새 DAG에 추가할 수 있습니다.다음 명령을 사용하여 파이프에 추가할 수 있습니다.
airflow webserver
airflow scheduler
최종 결과는 다음과 같이 공기 흐름 대시보드에 표시됩니다.
Reference
이 문제에 관하여(첫 번째 공기 흐름 파이프를 작성하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/seattledataguy/how-to-write-your-first-airflow-pipeline-2j43
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
airflow webserver
airflow scheduler
Reference
이 문제에 관하여(첫 번째 공기 흐름 파이프를 작성하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/seattledataguy/how-to-write-your-first-airflow-pipeline-2j43텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)