Airflow를 사용하여 일휴 데이터 흐름 실행
나는 데이터 과학부의 작은 섬이다.
지금 주위를 평론하는 일을 맡고 있습니다.
시험적으로 가져올 ETL 도구인 Airflow를 쓰고 싶습니다.
에어플로우
한마디로 고기능의 크론.
Python을 통해 모든 작업의 의존 관계를 정의할 수 있습니다.
에어비앤비가 독자 개발한 것이었는데, 지금은 아파치의 인큐버 프로젝트다.
일시적인 데이터 흐름을 고려해 보세요.
쉬는 시간에 모든 세션의 정보를 합쳤다.
그 데이터를 DWH로 가공하기 전의 절차는 다음과 같다.
나는 에어플로우로 상술한 데이터 처리 날짜를 실현하고 싶다.
환경 구축
이번에는 간단하게 github가 환경을 공개한 사람의 명의를 빌려 설정한다.
https://github.com/puckel/docker-airflow $ git clone [email protected]:puckel/docker-airflow.git
$ cd docker-airflow/
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
에어플로우에 엑시큐터가 있어요.
사용된 Executor에 따라 병렬 또는 직렬 처리 방법을 선택할 수 있습니다.
이번에는 셀리를 사용하는 Executor를 선택했습니다.
위의 명령만 있으면 다음 항목에 액세스할 수 있습니다.
화면 관리:localhost:8080
Flower: localhost:5555
이루어지다
우선 개별적인 처리를 쓰다.
예를 들어 DB에서 1일간 호텔 정보를 받아 CSV 처리에 저장한다고 적혀 있다.import pandas
import datetime
from jinja2 import Template
def make_hotel_table(**kwargs):
# データ取得するsqlの実行
#実行日時の取得
execution_date = kwargs.get('execution_date').date()
prev_execution_date = execution_date - datetime.timedelta(days=1)
with open("make_hotel_table.sql", 'r') as f:
sql = " ".join(f.readlines())
sql = Template(sql).render(date_from=prev_execution_date, date_to=execution_date)
conn = pymssql.connect(server='test', port='test', user='test', password='test')
data = pd.read_sql_query(sql, conn)
data.to_csv('hotel_table.csv')
return True
이런 느낌으로 개별적인 처리를 다 기술할 수 있다면
이후 전체적인 일정 설정과 임무의 정의, 의존 관계만 기술하면 된다.from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from func import *
default_args = {
'owner': 'ikyu',
# いつからデータをためるかを定義
'start_date': datetime(2017, 10, 1),
}
dag = DAG(
'ikyu_etl', default_args=default_args, schedule_interval='@daily') # いつ実行するかを定義
make_hotel_table = PythonOperator(
task_id='make_hotel_table',
provide_context=True,
python_callable=make_hotel_table,
dag=dag
)
make_booking_table = PythonOperator(
task_id='make_booking_table',
provide_context=True,
python_callable=make_booking_table,
dag=dag
)
make_user_table = PythonOperator(
task_id='make_user_table',
provide_context=True,
python_callable=make_user_table,
dag=dag
)
make_access_log = PythonOperator(
task_id='make_access_log',
provide_context=True,
python_callable=make_access_log,
dag=dag
)
combine_hotel_booking = PythonOperator(
task_id='combine_hotel_booking',
provide_context=True,
python_callable=combine_hotel_booking,
dag=dag
)
masking = PythonOperator(
task_id='masking',
provide_context=True,
python_callable=masking,
dag=dag
)
put_to_bigquery = PythonOperator(
task_id='put_to_bigquery',
provide_context=True,
python_callable=put_to_bigquery,
dag=dag
)
make_session_data = PythonOperator(
task_id='make_session_data',
provide_context=True,
python_callable=make_session_data,
dag=dag
)
combine_session_user = PythonOperator(
task_id='combine_session_user',
provide_context=True,
python_callable=combine_session_user,
dag=dag
)
combine_all_data = PythonOperator(
task_id='combine_all_data',
provide_context=True,
python_callable=combine_all_data,
dag=dag
)
append_to_dwh = PythonOperator(
task_id='append_to_dwh',
provide_context=True,
python_callable=append_to_dwh,
dag=dag
)
# 異存関係の定義
make_hotel_table.set_downstream(combine_hotel_booking)
make_booking_table.set_downstream(combine_hotel_booking)
make_user_table.set_downstream(masking)
make_access_log.set_downstream(put_to_bigquery)
put_to_bigquery.set_downstream(make_session_data)
masking.set_downstream(combine_session_user)
make_session_data.set_downstream(combine_session_user)
combine_hotel_booking.set_downstream(combine_all_data)
combine_session_user.set_downstream(combine_all_data)
combine_all_data.set_downstream(append_to_dwh)
그 결과 에어플로우의 관리 화면은 다음과 같다.
의존 관계자를 잘 정의한 것 같아서요.
실제로 실행해 보도록 하겠습니다.
일정은 DAG 옆에 있는 On Off 스위치를 클릭하면 간단합니다.
이번 DAG 자체 시작 시간은 2017/10/01로 설정됐다.
데이터 흐름으로서의 생각
- 데이터 수집 시작 시기 = > 데이터 삽입 시작 날짜
- 수행 시기 => 일정 간격(하루에 한 번, 1시간 1층 등)
이 두 가지는 따로 고려해야 한다.
데이터 삽입 시작 일로부터 오늘(12/11) 사이
일정 간격에 설정된 모든 실행 가능한 작업을 처리합니다.
그래서 일정을 On으로 잡으면
2017/10/1부터 오늘까지 매일 수행할 수 있는 작업을 처리할 수 있습니다.
Tree View를 보면 실제로 그렇다는 것을 확인할 수 있습니다.
이런 느낌으로 Airflow를 실행해 보세요!
끝말
뭐라고 쓰면 Airflow 도구 소개인 것 같아요.
앞으로 이걸로 ETL 주위와 평론 처리 절차를 실현할 수 있다면 좋겠다.
내일은 @japboy씨의 웹 Components FTW에 근거합니다.
Reference
이 문제에 관하여(Airflow를 사용하여 일휴 데이터 흐름 실행), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/kokojima/items/3860edce1ea3c5fa2120
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
쉬는 시간에 모든 세션의 정보를 합쳤다.
그 데이터를 DWH로 가공하기 전의 절차는 다음과 같다.
나는 에어플로우로 상술한 데이터 처리 날짜를 실현하고 싶다.
환경 구축
이번에는 간단하게 github가 환경을 공개한 사람의 명의를 빌려 설정한다.
https://github.com/puckel/docker-airflow $ git clone [email protected]:puckel/docker-airflow.git
$ cd docker-airflow/
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
에어플로우에 엑시큐터가 있어요.
사용된 Executor에 따라 병렬 또는 직렬 처리 방법을 선택할 수 있습니다.
이번에는 셀리를 사용하는 Executor를 선택했습니다.
위의 명령만 있으면 다음 항목에 액세스할 수 있습니다.
화면 관리:localhost:8080
Flower: localhost:5555
이루어지다
우선 개별적인 처리를 쓰다.
예를 들어 DB에서 1일간 호텔 정보를 받아 CSV 처리에 저장한다고 적혀 있다.import pandas
import datetime
from jinja2 import Template
def make_hotel_table(**kwargs):
# データ取得するsqlの実行
#実行日時の取得
execution_date = kwargs.get('execution_date').date()
prev_execution_date = execution_date - datetime.timedelta(days=1)
with open("make_hotel_table.sql", 'r') as f:
sql = " ".join(f.readlines())
sql = Template(sql).render(date_from=prev_execution_date, date_to=execution_date)
conn = pymssql.connect(server='test', port='test', user='test', password='test')
data = pd.read_sql_query(sql, conn)
data.to_csv('hotel_table.csv')
return True
이런 느낌으로 개별적인 처리를 다 기술할 수 있다면
이후 전체적인 일정 설정과 임무의 정의, 의존 관계만 기술하면 된다.from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from func import *
default_args = {
'owner': 'ikyu',
# いつからデータをためるかを定義
'start_date': datetime(2017, 10, 1),
}
dag = DAG(
'ikyu_etl', default_args=default_args, schedule_interval='@daily') # いつ実行するかを定義
make_hotel_table = PythonOperator(
task_id='make_hotel_table',
provide_context=True,
python_callable=make_hotel_table,
dag=dag
)
make_booking_table = PythonOperator(
task_id='make_booking_table',
provide_context=True,
python_callable=make_booking_table,
dag=dag
)
make_user_table = PythonOperator(
task_id='make_user_table',
provide_context=True,
python_callable=make_user_table,
dag=dag
)
make_access_log = PythonOperator(
task_id='make_access_log',
provide_context=True,
python_callable=make_access_log,
dag=dag
)
combine_hotel_booking = PythonOperator(
task_id='combine_hotel_booking',
provide_context=True,
python_callable=combine_hotel_booking,
dag=dag
)
masking = PythonOperator(
task_id='masking',
provide_context=True,
python_callable=masking,
dag=dag
)
put_to_bigquery = PythonOperator(
task_id='put_to_bigquery',
provide_context=True,
python_callable=put_to_bigquery,
dag=dag
)
make_session_data = PythonOperator(
task_id='make_session_data',
provide_context=True,
python_callable=make_session_data,
dag=dag
)
combine_session_user = PythonOperator(
task_id='combine_session_user',
provide_context=True,
python_callable=combine_session_user,
dag=dag
)
combine_all_data = PythonOperator(
task_id='combine_all_data',
provide_context=True,
python_callable=combine_all_data,
dag=dag
)
append_to_dwh = PythonOperator(
task_id='append_to_dwh',
provide_context=True,
python_callable=append_to_dwh,
dag=dag
)
# 異存関係の定義
make_hotel_table.set_downstream(combine_hotel_booking)
make_booking_table.set_downstream(combine_hotel_booking)
make_user_table.set_downstream(masking)
make_access_log.set_downstream(put_to_bigquery)
put_to_bigquery.set_downstream(make_session_data)
masking.set_downstream(combine_session_user)
make_session_data.set_downstream(combine_session_user)
combine_hotel_booking.set_downstream(combine_all_data)
combine_session_user.set_downstream(combine_all_data)
combine_all_data.set_downstream(append_to_dwh)
그 결과 에어플로우의 관리 화면은 다음과 같다.
의존 관계자를 잘 정의한 것 같아서요.
실제로 실행해 보도록 하겠습니다.
일정은 DAG 옆에 있는 On Off 스위치를 클릭하면 간단합니다.
이번 DAG 자체 시작 시간은 2017/10/01로 설정됐다.
데이터 흐름으로서의 생각
- 데이터 수집 시작 시기 = > 데이터 삽입 시작 날짜
- 수행 시기 => 일정 간격(하루에 한 번, 1시간 1층 등)
이 두 가지는 따로 고려해야 한다.
데이터 삽입 시작 일로부터 오늘(12/11) 사이
일정 간격에 설정된 모든 실행 가능한 작업을 처리합니다.
그래서 일정을 On으로 잡으면
2017/10/1부터 오늘까지 매일 수행할 수 있는 작업을 처리할 수 있습니다.
Tree View를 보면 실제로 그렇다는 것을 확인할 수 있습니다.
이런 느낌으로 Airflow를 실행해 보세요!
끝말
뭐라고 쓰면 Airflow 도구 소개인 것 같아요.
앞으로 이걸로 ETL 주위와 평론 처리 절차를 실현할 수 있다면 좋겠다.
내일은 @japboy씨의 웹 Components FTW에 근거합니다.
Reference
이 문제에 관하여(Airflow를 사용하여 일휴 데이터 흐름 실행), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/kokojima/items/3860edce1ea3c5fa2120
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
$ git clone [email protected]:puckel/docker-airflow.git
$ cd docker-airflow/
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
우선 개별적인 처리를 쓰다.
예를 들어 DB에서 1일간 호텔 정보를 받아 CSV 처리에 저장한다고 적혀 있다.
import pandas
import datetime
from jinja2 import Template
def make_hotel_table(**kwargs):
# データ取得するsqlの実行
#実行日時の取得
execution_date = kwargs.get('execution_date').date()
prev_execution_date = execution_date - datetime.timedelta(days=1)
with open("make_hotel_table.sql", 'r') as f:
sql = " ".join(f.readlines())
sql = Template(sql).render(date_from=prev_execution_date, date_to=execution_date)
conn = pymssql.connect(server='test', port='test', user='test', password='test')
data = pd.read_sql_query(sql, conn)
data.to_csv('hotel_table.csv')
return True
이런 느낌으로 개별적인 처리를 다 기술할 수 있다면이후 전체적인 일정 설정과 임무의 정의, 의존 관계만 기술하면 된다.
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from func import *
default_args = {
'owner': 'ikyu',
# いつからデータをためるかを定義
'start_date': datetime(2017, 10, 1),
}
dag = DAG(
'ikyu_etl', default_args=default_args, schedule_interval='@daily') # いつ実行するかを定義
make_hotel_table = PythonOperator(
task_id='make_hotel_table',
provide_context=True,
python_callable=make_hotel_table,
dag=dag
)
make_booking_table = PythonOperator(
task_id='make_booking_table',
provide_context=True,
python_callable=make_booking_table,
dag=dag
)
make_user_table = PythonOperator(
task_id='make_user_table',
provide_context=True,
python_callable=make_user_table,
dag=dag
)
make_access_log = PythonOperator(
task_id='make_access_log',
provide_context=True,
python_callable=make_access_log,
dag=dag
)
combine_hotel_booking = PythonOperator(
task_id='combine_hotel_booking',
provide_context=True,
python_callable=combine_hotel_booking,
dag=dag
)
masking = PythonOperator(
task_id='masking',
provide_context=True,
python_callable=masking,
dag=dag
)
put_to_bigquery = PythonOperator(
task_id='put_to_bigquery',
provide_context=True,
python_callable=put_to_bigquery,
dag=dag
)
make_session_data = PythonOperator(
task_id='make_session_data',
provide_context=True,
python_callable=make_session_data,
dag=dag
)
combine_session_user = PythonOperator(
task_id='combine_session_user',
provide_context=True,
python_callable=combine_session_user,
dag=dag
)
combine_all_data = PythonOperator(
task_id='combine_all_data',
provide_context=True,
python_callable=combine_all_data,
dag=dag
)
append_to_dwh = PythonOperator(
task_id='append_to_dwh',
provide_context=True,
python_callable=append_to_dwh,
dag=dag
)
# 異存関係の定義
make_hotel_table.set_downstream(combine_hotel_booking)
make_booking_table.set_downstream(combine_hotel_booking)
make_user_table.set_downstream(masking)
make_access_log.set_downstream(put_to_bigquery)
put_to_bigquery.set_downstream(make_session_data)
masking.set_downstream(combine_session_user)
make_session_data.set_downstream(combine_session_user)
combine_hotel_booking.set_downstream(combine_all_data)
combine_session_user.set_downstream(combine_all_data)
combine_all_data.set_downstream(append_to_dwh)
그 결과 에어플로우의 관리 화면은 다음과 같다.의존 관계자를 잘 정의한 것 같아서요.
실제로 실행해 보도록 하겠습니다.
일정은 DAG 옆에 있는 On Off 스위치를 클릭하면 간단합니다.
이번 DAG 자체 시작 시간은 2017/10/01로 설정됐다.
데이터 흐름으로서의 생각
- 데이터 수집 시작 시기 = > 데이터 삽입 시작 날짜
- 수행 시기 => 일정 간격(하루에 한 번, 1시간 1층 등)
이 두 가지는 따로 고려해야 한다.
데이터 삽입 시작 일로부터 오늘(12/11) 사이
일정 간격에 설정된 모든 실행 가능한 작업을 처리합니다.
그래서 일정을 On으로 잡으면
2017/10/1부터 오늘까지 매일 수행할 수 있는 작업을 처리할 수 있습니다.
Tree View를 보면 실제로 그렇다는 것을 확인할 수 있습니다.
이런 느낌으로 Airflow를 실행해 보세요!
끝말
뭐라고 쓰면 Airflow 도구 소개인 것 같아요.
앞으로 이걸로 ETL 주위와 평론 처리 절차를 실현할 수 있다면 좋겠다.
내일은 @japboy씨의 웹 Components FTW에 근거합니다.
Reference
이 문제에 관하여(Airflow를 사용하여 일휴 데이터 흐름 실행), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/kokojima/items/3860edce1ea3c5fa2120
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
뭐라고 쓰면 Airflow 도구 소개인 것 같아요.
앞으로 이걸로 ETL 주위와 평론 처리 절차를 실현할 수 있다면 좋겠다.
내일은 @japboy씨의 웹 Components FTW에 근거합니다.
Reference
이 문제에 관하여(Airflow를 사용하여 일휴 데이터 흐름 실행), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/kokojima/items/3860edce1ea3c5fa2120텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)