03_DB and Executors
수백 개의 tasks, DAGs는 동시에 어떻게 다룰 것인가?
- Executor
LocalExecutor
CeleryExecutor
- parameter(Tasks, DAGs 동시 수행 관련)
- parallelism
- concurrency(
dag_concurrency
,concurrency(해당 dag만)
) - max_active_run(
max_active_runs_per_dag
,max_active_runs(해당 dag만)
)
DAGs에서 여러 Task를 동시에 수행하기 위한 방법
- SequentialExecutor (Sequential하게 task 하나씩 진행)
- LocalExecutor (Single Machine에서 multiple tasks 수행 가능)
- CeleryExecutor (Multiple Machines에서 multiple tasks 수행 가능)
SequentialExecutor
SequentialExecutor 내에서는
task1
>> [task2, task3]
>> task4
으로 구성하는 경우,
task1
-> task2 or task3
-> task2, task3 중 앞에서 수행하지 않은 것
-> task4
순으로 동작한다.
즉, 순차적으로 진행된다.
task2, task3
이 동시에, 병렬적으로 수행되게 하려면 어떻게 해야 하는가?
LocalExecutor
2가지 설정이 선행되어야 한다.
1. 순차 데이터베이스(SQLite
)는 한번에 하나의 writer만 가지기 때문에, 데이터베이스를 변경해야 한다.(postgresql
로 변경)
2. SequentialExecutor
를 LocalExecutor
로 변경해야 한다.
1. Postgresql 설치 및 MetaDB를 Postgresql로 변경
- postgresql 설치
$ sudo apt update
$ sudo apt install postgresql
- connect to postgresql
$ sudo -u postgres psql
- password 설정
postgres=# ALTER USER postgres PASSWORD [PASSWORD]
postgresql과 연결 준비 완료
- extra package
$ pip install 'apache-airflow[postgres]'
postgres를 airflow의 metastore(metaDB)로 사용할 준비 완료
-
airflow.cfg의
sql_alchemy_conn = ...
을
sql_alchemy_conn = postgresql+psycopg2://postgres:[PASSWORD]@localhost/postgres
로 변경 -
저장 후 제대로 설정되었는지 metaDB 확인
$ airflow db check
로 확인할 수 있다.
2. LocalExecutor로 executor 변경
- airflow.cfg의
executor = SequentialExecutor
를
executor = LocalExecutor
로 변경
init, ID 생성 등 다시 수행
$ airflow db init
$ airflow users create \
--username [username] \
--firstname [firstname] \
--lastname [lastname] \
--role Admin \
--password [password] \
--email [[email protected]]
이후 적용시키려면 webserver를 재시작해야 한다.
Gantt를 확인하면 두 가지 작업이 동시에 수행되는 것을 확인할 수 있다.
(Single Machine에서도 자원에 따라 동시에 수행할 수 있음을 알 수 있다.)
- 최대한 많이 수행하려면?
- 예를 들어 Single Machine에서 100개 이상의 작업을 동시에 수행하는 것은 어려움이 있을 것이다.
- 다른 executor를 선택(필요한 만큼 확장하기 위해서)
CeleryExecutor
KubernetesExecutor
- 무한한 task를 동시에 수행 가능한가?
가장 중요한 것은 모든 executor가 순서에 맞게 실행되어야 한다는 점이다. > Queue
를 사용.
LocalExecutor
에서는 Task를Queue
에 Push,- Task가 실행될 준비가 되면
Queue
에서 Pulled out, execute된다.
CeleryExecutor
LocalExecutor
로 Single Machine에서 multiple tasks를 parallel하게 수행할 수 있지만, 이 역시 한계가 있을 수 있다. -> Multiple Machines(Workers)에서 Multiple Tasks를 수행해야 할때는?"CeleryExecutor"
Queue 역할로 Redis를 사용한다.
CeleryExecutor
CeleryExecutor
는 tasks를 multiple machines에서 수행 가능하도록 해준다.(반면,LocalExecutor
에서는 tasks를 single machine에서만 수행할 수 있다.)CeleryExecutor
를 사용하고 싶다면 external tool을 install and setup해야 한다.(Redis 같은)- 각각의 Worker는 일치해야 한다.
각 worker는 machine에 해당하므로, 각 machine에는 task가 실행되게 하는 인스턴스가 실행된다. 그러므로 모든 machines가 same dependencies를 공유해야 한다.(예를 들어 작업이 AWS와 상호작용한다고 하면, interact를 위한 python module을 모든 machines에 설치해야 한다. 그렇지 않으면 오류가 발생한다.)
CeleryExecutor 설정
Celery 설정
$ pip install 'apache-airflow[celery]'
airflow.cfg에서 executor = CeleryExecutor
로 변경
Redis 설정
message broker 역할(Queue)로 Redis
를 설치
$ sudo apt update
$ sudo apt install redis-server
- redis 설정 변경
$ sudo nano /etc/redis/redis.conf
들어가서
supervised no
-> `supervised systemd로 변경 후 저장(ctrl + x) - restart 해서 적용 및 확인
$ sudo systemctl restart redis.service
$ sudo systemctl status redis.service
->active : active(running)
상태 확인
celery를 세팅하기 위해 변경해야 할 parameter 설정.
-
broker_url
: airflow.cfg의
broker_url - celeryExecutor에서 task를 redis message로 push하기 위해서
사용된다.broker_url = redis://redis:6379/0
->broker_url = redis://localhost:6379/0
6379는 포트이고 0은 데이터베이스 이름이 0이라는 뜻이다.
-
result_backend
: celery에서 해당 작업의 실행과 관련된 일부 metadata가 저장된다.
=> result_backend 설정해야 하는 이유이다.result_backend = db+postgresql://postgres:airflow@postgres/airflow
->result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres
- 추가 설정
broker_url를 redis와 연결되게 설정했기 때문에 redis 패키지 관련 설치
$ pip install 'apache-airflow[redis]'
Flower
- flower란 airflow에서 CeleryExecutor에 default 로 제공하는 UI로
tasks가 실행되는 workers, machines를 모니터링하게 해준다.2.1.0 version에서는 default가 아닌 것으로 보인다. -> Link를 이용해서 버전 변경 후
$ airflow celery flower
로 동작한다.
- worker 추가하는 방법?
result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres
에서
result_backend = db+postgresql://postgres:postgres@localhost/postgres
로 변경
$ airflow celery worker
- flower UI에서 worker가 추가 된 것을 확인할 수 있다.(이제 해당 worker(machine)에 task를 추가할 수 있다.)
- Trigger 시에 worker에서 성공, 실패 등 task의 상태를 점검할 수 있다.
Task, DAG의 수 제한
- Airflow instance 내의 Task의 총 수를 제한하고 싶다면? (
parallelism
)
- 하나의 worker에서 수행되는 task의 수를 제한하고 싶다면? (
dag_concurrency
, concurrency
)
- DAGrun의 수를 제한하고 싶다면?(
max_active_runs_per_age
, max_active_runs
)
parallelism
parallelism
: 전체 DAG instance에서 실행 가능한 최대 task 수를 정의
parallelism
)dag_concurrency
, concurrency
)max_active_runs_per_age
, max_active_runs
)parallelism
: 전체 DAG instance에서 실행 가능한 최대 task 수를 정의ex) LocalExecutor를 사용하더라도 parallelism = 1로 설정하면
SequentialExecutor와 동일하게 동작한다.
concurrency
dag_concurrency
: 전체 DAG instance에서 동시에 수행할 수 있는 최대 task의 수concurrency
: 각각의 DAG에서 설정할 수 있으며 해당 DAG에서 동시에 수행할 수 있는 task의 수를 제한
# x는 해당 DAG에서 동시에 수행할 수 있는 최대 tasks 수
with DAG(..., concurrency = x)
max_active_runs
max_active_runs_per_dag
: 전체 DAG instance에서 동시에 수행할 수 있는 최대 DAG의 수max_active_runs
: 해당 DAG에 허용되는 활성 DAG 실행의 최대 수
# x는 해당 DAG에 허용되는 활성 DAG 실행의 최대 수
with DAG(..., concurrency = x)
예를 들어) 하나의 DAGrun이 일부
데이터
를 처리하며,
이데이터
는 다음 DAGrun에서 사용된다고 가정하자.
이전 Dagrun이 수행되고 다음 DAGrun이 수행되어야 하므로, 동시에 실행할 수 있는
DAGrun의 수를 제한하는 것이 필요하다. ->max_active_runs
사용
Exercise)
1)
parallelism = 4이고
dags_concurrency = 6이면?
parallelism에 우선순위가 있기 때문에
한 dag에서 동시에 수행 가능한 task의 수는 4이다.
2)
2개의 DAG가 있다.
DAG A [T1, T2] >> T3
DAG B [T1, T2] >> T3
parallelism = 4
dags_concurrency = 2
max_active_runs_per_dag = 1
이면, DAG A, DAG B가 동시에 수행될 수 있는가?
parallelism, dags_concurrency만 봐서는 가능하지만,
한번에 최대 1개의 DAG만 동시에 수행 가능하기 때문에 불가능하다.
Author And Source
이 문제에 관하여(03_DB and Executors), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@minj10092/03DB-and-Executors저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)