digdag의 td_wait_table>과 td_wait>를 사용해 보았습니다.

4336 단어 TreasureDatadigdag
digdag(정확하게는 TreasureData Workflow)의 td_wait_table> , td_wait> 를 사용해 보았으므로 그 메모입니다.

작업 흐름



이런 흐름으로 시험해 보았습니다.

1. td_wait_table> 에서 "wf_wait_target"이라는 테이블이 생길 때까지 기다린다.
2. 'wf_wait_target'에 쿼리하기
3. td_wait> 에서 "wf_check"테이블에 100개 이상의 레코드가 저장될 때까지 기다립니다.
4. wf_check 테이블의 내용을 export_wf_check로 내보내기

dig 파일, SQL은 다음과 같습니다.

dig 파일



td_wait.dig
timezone: Asia/Tokyo

schedule:
  daily>: 09:00:00

_export:
  td:
    database: test

+task_wait_table:
  td_wait_table>: wf_wait_target

+task1:
  td>: queries/count_wf_wait_target.sql

+task_wait:
  td_wait>: queries/check_record.sql

+task2:
  td>: queries/export_wf_check.sql
  create_table: export_wf_check


SQL



count_wf_wait_target.sql
SELECT
    count(1) as cnt
FROM
    wf_wait_target


check_record.sql
SELECT
    count(1) > 100
FROM
    wf_check


export_wf_check.sql
SELECT
    *
FROM
    wf_check


작업을 실행해보기



작성한 dig, SQL을 TreasureData Workflow에 Push 해 즉시 실행해 보겠습니다.
td wf push wf_sample
td wf start wf_sample td_wait --session now

여기에서 TreasureData Workflow 콘솔에서 확인할 것입니다.

td_wait_table



"wf_wait_target"이라는 테이블이 생길 때까지 기다리고 있습니다.
30초에 한 번 테이블의 존재를 확인하는 것을 볼 수 있습니다.



td_wait(레코드 수) 체크



"wf_wait_target"테이블을 만들면 다음으로 진행하여 check_record.sql을 실행하여 100 행 이상인지 확인합니다.
Timeline도 진행되고 있네요.



실행 완료



"wf_check"테이블에 10,000개의 행을 삽입하면 마지막 내보내기까지 성공적으로 완료되었습니다.



사이고에게



digdag의 문서에도 기재되어 있습니다만, td_wait_table는 단순한 테이블 유무 뿐만이 아니라 레코드수도 조건에 넣어지기 때문에 매우 사용하기 편해 보인다.
또, td_wait는 true를 돌려줄 때까지 기다리기 때문에, 단순한 건수 이외에서도 사용할 수 있는 것이 좋네요.

문서에는 쓰고 있지 않습니다만, 이 wait에 타임 아웃은 있는 것일까?

(digdag 문서)
* td_wait_table
* td_wait

좋은 웹페이지 즐겨찾기