Airflow에서 redash 쿼리 결과를 GCS로 보내기

별로 없는 장면이라고 생각합니다만, 「이런 일이 있었다」가 엄청나게…
Airflow에서 redash 쿼리 결과를 API를 사용하여 GCS로 보낼 수 있도록 조사한 개인 메모

하고 싶은 일


  • redash 쿼리 API를 실행하고 검색 결과를 GCS에 업로드합니다.

    그림에 나타내면 아래와 같이 됩니다.


    작업 환경과 이용 서비스 (ver는 작업 당시의 것)


  • GCP Cloud Composer
  • Airflow(ver 1.10.2)

  • redash (var 5.0)
  • GCS bucket (이미 준비됨)

  • DAG의 흐름



    간단한 설명입니다만, 이번은 아래와 같이 2개의 순서를 밟아 데이터를 보냈습니다.

    redash 쿼리 API를 Airflow에서 실행


  • redash에 관한 operator가 없기 때문에 파이썬의 "requests"를 이용하여 API를 두드린다.
  • Requests: HTTP for Humans


  • 획득한 데이터를 GCS로 전송


  • "google-cloud-storage"라는 라이브러리를 사용하여 API 데이터를 GCS로 보내기
  • google-cloud-storage


  • 샘플 코드



    이쪽도, 매우 대략적으로 됩니다만 올립니다.
    redash의 쿼리 API 키는 csv를 사용합니다.
    쿼리 API 키의 관리에 대해서는 Airflow의 Connections등으로 관리하는 것이 좋을까 생각합니다(여기에서는 굳이 직접 쓰고 있습니다).
    import datetime
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from google.cloud import storage
    import requests
    
    client = storage.Client()
    
    default_args = {
        'owner': 'task owner',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'execution_timeout': datetime.timedelta(hours=1),
        'retries': 3,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': datetime.datetime(2019, 08, 3),
    }
    
    
    
    def redash_to_gcs(ds, **kwargs):
        result = requests.get('{redashのクエリAPIキー}')
        print(result)
        bucket = client.get_bucket('{送り先bucket}')
        blob = bucket.blob("{出力ファイル名}.csv")
        blob.upload_from_string(result.text.encode('utf-8'))
    
    to_gcs = PythonOperator(
        task_id='echo_results',
        python_callable=redash_to_gcs,
        provide_context=True
    )
    
    
    with airflow.DAG(
            'salesforce',
            'catchup=False',
            default_args=default_args,
            schedule_interval='0 23 * * *') as dag:
    
        start_task = DummyOperator(task_id='start')
        finish_task = DummyOperator(task_id='finish')
    
    
        to_gcs = PythonOperator(
            task_id='echo_results',
            python_callable=redash_to_gcs,
            provide_context=True)
    
        start_taslk >> to_gcs >> finish_task
    

    좋은 웹페이지 즐겨찾기