Airflow를 사용하여 일휴 데이터 흐름 실행

16689 단어 ETLairflow
이 글은 잠시 쉬다.com Advent Calenrad 2017의 11일째다.
나는 데이터 과학부의 작은 섬이다.
지금 주위를 평론하는 일을 맡고 있습니다.
시험적으로 가져올 ETL 도구인 Airflow를 쓰고 싶습니다.

에어플로우


한마디로 고기능의 크론.
Python을 통해 모든 작업의 의존 관계를 정의할 수 있습니다.
에어비앤비가 독자 개발한 것이었는데, 지금은 아파치의 인큐버 프로젝트다.

일시적인 데이터 흐름을 고려해 보세요.


쉬는 시간에 모든 세션의 정보를 합쳤다.
그 데이터를 DWH로 가공하기 전의 절차는 다음과 같다.

나는 에어플로우로 상술한 데이터 처리 날짜를 실현하고 싶다.

환경 구축


이번에는 간단하게 github가 환경을 공개한 사람의 명의를 빌려 설정한다.
https://github.com/puckel/docker-airflow
$ git clone [email protected]:puckel/docker-airflow.git
$ cd docker-airflow/
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
에어플로우에 엑시큐터가 있어요.
사용된 Executor에 따라 병렬 또는 직렬 처리 방법을 선택할 수 있습니다.
이번에는 셀리를 사용하는 Executor를 선택했습니다.
위의 명령만 있으면 다음 항목에 액세스할 수 있습니다.
화면 관리:localhost:8080

Flower: localhost:5555

이루어지다


우선 개별적인 처리를 쓰다.
예를 들어 DB에서 1일간 호텔 정보를 받아 CSV 처리에 저장한다고 적혀 있다.
import pandas
import datetime
from jinja2 import Template


def make_hotel_table(**kwargs):
    # データ取得するsqlの実行
    #実行日時の取得
    execution_date = kwargs.get('execution_date').date()
    prev_execution_date = execution_date - datetime.timedelta(days=1)
    with open("make_hotel_table.sql", 'r') as f:
        sql = " ".join(f.readlines())

    sql = Template(sql).render(date_from=prev_execution_date, date_to=execution_date)

    conn = pymssql.connect(server='test', port='test', user='test', password='test')
    data = pd.read_sql_query(sql, conn)
    data.to_csv('hotel_table.csv')

    return True
이런 느낌으로 개별적인 처리를 다 기술할 수 있다면
이후 전체적인 일정 설정과 임무의 정의, 의존 관계만 기술하면 된다.
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from func import *


default_args = {
    'owner': 'ikyu',
    # いつからデータをためるかを定義
    'start_date': datetime(2017, 10, 1),
}

dag = DAG(
    'ikyu_etl', default_args=default_args, schedule_interval='@daily') # いつ実行するかを定義

make_hotel_table = PythonOperator(
    task_id='make_hotel_table',
    provide_context=True,
    python_callable=make_hotel_table,
    dag=dag
)

make_booking_table = PythonOperator(
    task_id='make_booking_table',
    provide_context=True,
    python_callable=make_booking_table,
    dag=dag
)

make_user_table = PythonOperator(
    task_id='make_user_table',
    provide_context=True,
    python_callable=make_user_table,
    dag=dag
)

make_access_log = PythonOperator(
    task_id='make_access_log',
    provide_context=True,
    python_callable=make_access_log,
    dag=dag
)

combine_hotel_booking = PythonOperator(
    task_id='combine_hotel_booking',
    provide_context=True,
    python_callable=combine_hotel_booking,
    dag=dag
)

masking = PythonOperator(
    task_id='masking',
    provide_context=True,
    python_callable=masking,
    dag=dag
)

put_to_bigquery = PythonOperator(
    task_id='put_to_bigquery',
    provide_context=True,
    python_callable=put_to_bigquery,
    dag=dag
)

make_session_data = PythonOperator(
    task_id='make_session_data',
    provide_context=True,
    python_callable=make_session_data,
    dag=dag
)

combine_session_user = PythonOperator(
    task_id='combine_session_user',
    provide_context=True,
    python_callable=combine_session_user,
    dag=dag
)

combine_all_data = PythonOperator(
    task_id='combine_all_data',
    provide_context=True,
    python_callable=combine_all_data,
    dag=dag
)

append_to_dwh = PythonOperator(
    task_id='append_to_dwh',
    provide_context=True,
    python_callable=append_to_dwh,
    dag=dag
)


# 異存関係の定義
make_hotel_table.set_downstream(combine_hotel_booking)
make_booking_table.set_downstream(combine_hotel_booking)
make_user_table.set_downstream(masking)
make_access_log.set_downstream(put_to_bigquery)
put_to_bigquery.set_downstream(make_session_data)
masking.set_downstream(combine_session_user)
make_session_data.set_downstream(combine_session_user)
combine_hotel_booking.set_downstream(combine_all_data)
combine_session_user.set_downstream(combine_all_data)
combine_all_data.set_downstream(append_to_dwh)
그 결과 에어플로우의 관리 화면은 다음과 같다.

의존 관계자를 잘 정의한 것 같아서요.

실제로 실행해 보도록 하겠습니다.


일정은 DAG 옆에 있는 On Off 스위치를 클릭하면 간단합니다.

이번 DAG 자체 시작 시간은 2017/10/01로 설정됐다.
데이터 흐름으로서의 생각
- 데이터 수집 시작 시기 = > 데이터 삽입 시작 날짜
- 수행 시기 => 일정 간격(하루에 한 번, 1시간 1층 등)
이 두 가지는 따로 고려해야 한다.
데이터 삽입 시작 일로부터 오늘(12/11) 사이
일정 간격에 설정된 모든 실행 가능한 작업을 처리합니다.
그래서 일정을 On으로 잡으면
2017/10/1부터 오늘까지 매일 수행할 수 있는 작업을 처리할 수 있습니다.
Tree View를 보면 실제로 그렇다는 것을 확인할 수 있습니다.

이런 느낌으로 Airflow를 실행해 보세요!

끝말


뭐라고 쓰면 Airflow 도구 소개인 것 같아요.
앞으로 이걸로 ETL 주위와 평론 처리 절차를 실현할 수 있다면 좋겠다.
내일은 @japboy씨의 웹 Components FTW에 근거합니다.

좋은 웹페이지 즐겨찾기