Airflow에서 redash 쿼리 결과를 GCS로 보내기
Airflow에서 redash 쿼리 결과를 API를 사용하여 GCS로 보낼 수 있도록 조사한 개인 메모
하고 싶은 일
그림에 나타내면 아래와 같이 됩니다.
작업 환경과 이용 서비스 (ver는 작업 당시의 것)
DAG의 흐름
간단한 설명입니다만, 이번은 아래와 같이 2개의 순서를 밟아 데이터를 보냈습니다.
redash 쿼리 API를 Airflow에서 실행
획득한 데이터를 GCS로 전송
샘플 코드
이쪽도, 매우 대략적으로 됩니다만 올립니다.
redash의 쿼리 API 키는 csv를 사용합니다.
쿼리 API 키의 관리에 대해서는 Airflow의
Connections
등으로 관리하는 것이 좋을까 생각합니다(여기에서는 굳이 직접 쓰고 있습니다).import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import requests
client = storage.Client()
default_args = {
'owner': 'task owner',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'execution_timeout': datetime.timedelta(hours=1),
'retries': 3,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2019, 08, 3),
}
def redash_to_gcs(ds, **kwargs):
result = requests.get('{redashのクエリAPIキー}')
print(result)
bucket = client.get_bucket('{送り先bucket}')
blob = bucket.blob("{出力ファイル名}.csv")
blob.upload_from_string(result.text.encode('utf-8'))
to_gcs = PythonOperator(
task_id='echo_results',
python_callable=redash_to_gcs,
provide_context=True
)
with airflow.DAG(
'salesforce',
'catchup=False',
default_args=default_args,
schedule_interval='0 23 * * *') as dag:
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
to_gcs = PythonOperator(
task_id='echo_results',
python_callable=redash_to_gcs,
provide_context=True)
start_taslk >> to_gcs >> finish_task
Reference
이 문제에 관하여(Airflow에서 redash 쿼리 결과를 GCS로 보내기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/senohirona/items/c75c772b7f2b8d11df6f텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)