P Cloud Composer - GCS에서 BigQuery로 데이터 로드

GCP Cloud Composer를 다루는 길 부터 계속
전회는 GCP의 다큐멘트를 참고로 AirflowCloud Composer 의 개요를 정리했습니다만 이번은 실제로 Cloud Composer 서비스를 사용한 샘플 어플리케이션을 구현해 보겠습니다.

시스템 구성



DAG 흐름도



이번에는 start와 from_exsample-composer-gcs_import_bigquery와 end의 세 가지 작업을 만듭니다.

작업 목록 작업 ID 설명 시작 작업 시작을 나타내는 더미 작업 from_exsample-composer-gcs_import_bigquery GCS에서 BigQuery로 데이터를 전송하는 작업 끝 작업 종료를 나타내는 더미 작업 시스템 구성도 샘플 애플리케이션 샘플 애플리케이션은 Github 저장소 에 올라 있습니다.

준비



서비스 계정에 IAM 역할 추가




사용자 이름



minarai-cloud-composer
roles/composer.worker

minarai-cloud-composer
스토리지 객체 뷰어

minarai-composer-admin
roles/composer.admin

minarai-composer-admin
BigQuery 관리자


BigQuery 테이블 만들기


  • minarai-composer-admin 계정을 활성화하고 다음을 수행합니다
  • bq --location asia-northeast1 mk \
         --dataset \
         --description "exsample composer GCS to BQ" \
         ${PROJECT_ID}:exsample_composer_dataset
    
    bq --location asia-northeast1  mk -t \
          --schema ./dags/schemas/exsample_composer_table.json \
          --time_partitioning_type DAY \
          --description "example composer GCS to BQ" \
          ${PROJECT_ID}:exsample_composer_dataset.exsample_composer_table
    

    Cloud Storage 버킷 만들기


  • BigQuery Schema를 배치하는 버킷과 BiqQuery에 흐르는 데이터를 배치하는 버킷을 만듭니다.

    Cloud Composer 환경 구축 또는 환경 업데이트


  • Cloud Composer 환경 구축이 여전히 있는 경우 아래 명령의 updatecreate로 바꾸십시오.
    gcloud composer environments update ${CLOUD_ENV} \
          --location asia-northeast1 \
          --update-env-variables=PROJECT_ID=${PROJECT_ID},BQ_LOCATION=${LOCATION}
    

    DAG 등록(업로드)



    Cloud Composer 환경이 생성되면 GCS에 Cloud Composer dags 버킷이 생성됩니다.
    GCS 버킷에 DAG를 업로드하면 Airflow dags 디렉토리에 마운트됩니다.
    gcloud 명령에서 업로드하려면 다음 명령을 실행합니다.
    gcloud composer environments storage dags import \
            --environment ${CLOUD_ENV} \
            --location ${LOCATION} \
            --source ./dags/gcs_to_bq.py
    
  • 좋은 웹페이지 즐겨찾기