Cloud Composer Tips

여러분 안녕하세요. @best_not_best 입니다.

올해는 Cloud Composer 을 이용하는 경우가 많았기 때문에, 거기서 얻은 Tips를 정리하고 싶습니다.
덧붙여 아래에 기재된 코드는 Python 3에서의 동작을 상정하고 있습니다.

지원되는 Python 버전



2.7.15와 3.6.6입니다.
h tps : // c ぉ d. 오, ぇ. 코 m / 코 m 포세 r / 도 cs / 콘세 pts / py 텐 ゔ ぇ r 시온

작업 실패 시 로깅/Monitoring 감지



태스크가 실패했을 때의 로깅에의 로그의 떨어지는 방법이 좀처럼 알기 어렵습니다.AirflowExceptionraise 하는 태스크를 준비해, 이것을 각 태스크의 뒤에 one_failed 로 실행시켜, 이 때의 메세지를 검지하는 것이 간단할까 생각합니다.

샘플 프로그램



sample_1.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""sample."""

from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

def output_error():
    """output error."""
    raise AirflowException('task failed.')

# (省略)

with models.DAG(
    dag_id='sample_001',
    # (省略)
) as dag:
    task_1 = # (省略)
    task_2 = # (省略)
    error_task = PythonOperator(
        task_id='error_task',
        python_callable=output_error,
        retries=0,
        trigger_rule='one_failed',
        dag=dag,
    )

    task_1 >> error_task
    task_2 >> error_task
    task_1 >> task_2

로깅의 지표



아래에서 측정항목을 만듭니다.
resource.type="cloud_composer_environment"
logName=projects/<プロジェクトID>/logs/airflow-worker
labels.workflow:sample_001
("task failed.")



Monitoring의 경고 정책



위 측정항목을 대상으로 하는 경고 정책을 만듭니다.




오류 로그



로깅에서는 조금 알기 어려우므로, Composer 버킷 부하의 로그를 알기 쉽다고 생각합니다.



작업 아이디어


PythonOperator 어느 정도 자유롭게 처리를 기술할 수 있습니다만, 준비되어 있는 Operator를 최대한 사용하는 것이 처리 속도가 빠릅니다.
htps : //아이 rfぉw. 아파치. 오 rg/도 cs/아파치ぇ-아이 rfぉw/s타 bぇ/_아피/아이 rfぉw/오페라와 rs/그리고 x. HTML
htps : //아이 rfぉw. 아파치. 오 rg/도 cs/아파치ぇ-아이 rfぉw/s타 bぇ/_아피/아이 rfぉw/안 tb/오페라와 rs/그리고 x. HTML

예를 들어 BigQuery의 데이터를 Cloud Storage에 CSV 파일을 출력하는 프로세스라면,
  • PythonOperator 에서 BigQuery 데이터를 쿼리로 검색
  • PythonOperator에서 얻은 데이터를 CSV 파일로 만들고 로컬로 저장
  • PythonOperator에서 CSV 파일을 Cloud Storage에 복사

  • 대신
  • BigQueryOperator 에서 쿼리 실행 결과를 BigQuery에 테이블 저장
  • BigQueryToCloudStorageOperator 에서 테이블의 데이터를 Cloud Storage로 출력

  • BigQuery나 Cloud Storage 측에서 처리를 수행하므로 실행 시간이 빨라집니다.
    (전자는 데이터수에 따라서는 처리가 끝나지 않는 것도・・・.)

    VPC 서비스 제어



    경계 안에 둘 수 있습니다. 하지만 Python 패키지의 설치나 외부 시스템과의 연동시의 설정이 복잡해지는 등의 이유로 개인적으로는 이용하고 있지 않습니다.
    BigQuery 또는 Cloud Storage만으로도 경계 내에 배치하려는 경우,
  • BigQuery 및 Cloud Storage를 처리하는 프로젝트 A (경계 내)
  • Cloud Composer를 처리하는 프로젝트 B(경계 밖)

  • 와 같이 프로젝트를 나누고 Cloud Composer를 실행하는 서비스 계정에서 액세스를 프로젝트 A에서 허용하도록 하면 나중에 즐겁게 생각합니다.



    또 여담입니다만, 보안 향상을 위해라고는 해도 FW 룰로 내림을 전 차단해 버리면, 스케줄러등이 움직이지 않게 되므로 주의해 주세요. ( 테넌트 프로젝트 와 통신하고 있기 때문에?)
    가능한 한 내리는 열린 채로 하는 것이 좋을까 생각합니다.

    헬스 체크



    이하 참고해 주십시오.
    htps : // c ぉ d. 오, ぇ. 코 m / 코 m 포세 r / 두 cs / 쓰리 아 ls / 헤아 lth-chi ck

    요약


  • 태스크 실패 감지는 탐지 작업을 준비합니다
  • 제공된 Operator를 최대한 활용하십시오
  • 과도한 VPC 서비스 제어 및 FW 규칙의 액세스 제한은 유지 관리 운영을 복잡하게 할 수 있습니다.

    이상, 참고가 되면 다행입니다!
  • 좋은 웹페이지 즐겨찾기