기류 모범 사례(翻譯)

4395 단어 airflow
https://airflow.apache.org/docs/apache-airflow/2.3.3/best-practices.html

Creating a new DAG is a two-step process:



writing Python code to create a DAG object,

testing if the code meets our expectations

이 자습서에서는 이 두 단계에 대한 모범 사례를 소개합니다.

DAG 작성



사용자 지정 연산자/후크 만들기(TODO)



미정

작업 만들기



Airflow Task 應該被視為資料庫的Transaction,這表示每個Task的結束都必須是完整的結果.每一次的re-run都能確保能重現一樣的結果.

以下介紹避免的無法重現結果的方法:
  • SQL INSERT를 사용하여 다시 실행하십시오.
  • 讀寫特定的區塊,而非是最new的資料.當有人在re-run之前新增資料就會導致資料不相同可以黥用像是 data_interval_start 指定時間範圍避圍.
  • 在Task內不應該使用 now() 這類的函式.尤其是用在重要的計算上,而導致task re-run會取得不一樣的結果的但如果Task是紀鞄log就沒關係.

  • 팁: 重複使用到的parameter像是connection_id 應該直接定義在default_args ,不需要個別定義在每個Task,可避免typo.Operator會自動取用對應的connection type.

    작업 삭제



    刪除Task,從Webservr會無法看到Task也導致無法看到태스크 로그.

    如果非必要,建議重建新的DAG.

    의사소통



    當用到 Kubernetes executor 或是 Celery executor 在不同機器執行Task,必定會面臨到資料傳遞的議題.

    기류XCom :
  • 小資料:直接透過 XCom 傳遞
  • 大資料:上傳到S3/HDFS,再透過 XCom 傳遞檔案路徑給Downstream Task.
  • 避免傳遞敏感資料(암호, 토큰),而是透過 Connection 保存.

  • 최상위 Python 코드



    You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow.



    Python 호출 가능, 최상위 레벨 가져오기, 로컬 가져오기.

    예시: https://airflow.apache.org/docs/apache-airflow/2.3.3/best-practices.html#top-level-python-code

    동적 DAG 생성(TODO)



    미정

    기류 변수



    최상위 Python 코드에서 Airflow 변수를 사용합니다.

    변경 후 DAG 트리거



    當你對DAG資料夾內的任何檔案做異動,先確保scheduler已完成異動,不要馬上觸發DAG.

    以下系統參數根據可根據需求微調:
  • 스케줄러_유휴_수면_시간
  • min_file_process_interval
  • dag_dir_list_interval
  • 구문 분석 프로세스
  • file_parsing_sort_mode

  • 트리거 규칙이 있는 감시자 패턴의 예



    試想 試想 個 個 Dag 需求 需求, dag 內 任何 任何 태스크 是否 出錯 出錯 出錯 出錯, 必定 要 觸發 觸發 最後 一 個 작업 (例如 : 清理 資料 資料 資料 資料 資料 資料 資料 而 而 而 成功 成功 成功 與否 與否 的 定義 定義 取決於 取決於 잎 태스크, 這 導致 只 會 會 產出 成功 成功 取決於 只結果的DAG 실행, 讓user不容易掌握작업 실패 錯誤原因.



    발신자 example-of-watcher-pattern-with-trigger-rules

    透過設定 TriggerRule.ONE_FAILED ,可以讓最終的watcher task會因為任一個task的失敗狀態而最終讓DAG Run顯示失敗.

    참고: 트리거 규칙은 업스트림 작업과 관련이 있습니다.

    DAG 복잡성 감소


  • 關注 Top level Python Code 議題.可以透過 DAG Loader Test 觀察執行 時間.
  • DAG結構越簡單越好,task相依性要求會增加更多的處理時間.所以線性結構會好過於複雜的巢狀結構.
  • 單一檔案盡量放置少量的DAGs.

  • DAG 테스트



    DAG 로더 테스트



    實際的DAG 파싱 시간 = Python初始化時間 - 全部執行時間
  • Python初始化時間: time python -c ''
  • 全部執行時間:time python my-dag.py

  • 단위 테스트(TODO)



    미정

    자가진단



    透過task驗證 upstram task是否完成.

    예: 透過內建的S3KeySensor 驗證檔案是否已在S3落地.

    스테이징 환경



    透過環境變數等方法控制不同環境下的行為.

    모의 변수 및 연결(TODO)



    미정

    메타데이터 DB 유지보수



    定期清理不在需要的DAG, log紀錄.

    예:

    airflow db clean --clean-before-timestamp '2022-08-01 00:00:00+00:00' --dry-run
    


    업그레이드 및 다운그레이드(TODO)



    미정

    좋은 웹페이지 즐겨찾기