TreasureData Workflow (digdag)를 사용해 보았다 (초입문) [참고]

9931 단어 TreasureDatadigdag

TreasureData Workflow(digdag)란?



TreasureData가 개발한 워크플로 엔진입니다.
자세한 내용은 다음 문서를.

htps //w w.ぢg다g. 이오/
htp // // cs.ぢg다g. 이오/
htps : // / cs. t 어째서였다. 이 m / r ch c ぇ s / rkf ぉ ws

시작하기 전에



개요는 물론 읽어 두는 것으로, project나 session, attempt 되는 것이 나오고, 처음에는 약간 혼란하므로, 이하를 읽어 두면 좋을 것 같다

여기에서 copipe입니다.

Projects and revisions



In Digdag, workflows are packaged together with other files used in the workflows. The files can be anything such as SQL scripts, Python/Ruby/Shell scripts, configuration files, etc. This set of the workflow definitions is called project.

알겠습니다.

Sessions and attempts



A session is a plan to run a workflow which should complete successfully. An attempt is an actual execution of a session. A session has multiple attempts if you retry a failed workflow.

session은 하나의 워크 플로우 실행 단위이고 session과 attempts는 1 : n의 관계이므로

The reason why sessions and attempts are separated is that an execution may fail.

실패할 가능성이 있으므로 나누고 있으면.
알겠습니다.

실천



기본적으로는 여기

workflow를 정의하는 dig 파일 만들기



이런 느낌. 내용은 일일(7:00)에 sum1.sql을 실행하고 결과를 test_wf1 테이블에 넣은 다음 sum2.sql을 실행하여 test_wf2에 넣습니다.

job1.dig
timezone: Asia/Tokyo

schedule:
  daily>: 07:00:00

_export:
  td:
    database: test

+task1:
  td>: queries/sum1.sql
  create_table: test_wf1

+task2:
  td>: queries/sum2.sql
  create_table: test_wf2


SQL



SQL은 이런 느낌입니다. test1과 test2라는 테이블에 같은 SQL을 던지고 있습니다.

sum1.sql
SELECT
    created_by,
    count(distinct ip) ip,
    count(distinct user) uu,
    count(1) as logs
FROM
    test1
WHERE
    TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
    created_by

sum2.sql
SELECT
    created_by,
    count(distinct ip) ip,
    count(distinct user) uu,
    count(1) as logs
FROM
    test2
WHERE
    TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
    created_by

디렉토리 구성



SQL은 queries 아래에 넣습니다.
wf_sample/
├── job1.dig
└── queries
    ├── sum1.sql
    └── sum2.sql

로컬로 실행



TD의 workflow로 올리기 전에 로컬에서 실행해 봅시다.
$ td wf run job1
2016-12-08 20:15:49 +0900: Digdag v0.8.22
2016-12-08 20:16:02 +0900 [WARN] (main): Using a new session time 2016-12-08T00:00:00+09:00 based on schedule.
2016-12-08 20:16:02 +0900 [INFO] (main): Using session /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900.
2016-12-08 20:16:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job1 session_time=2016-12-08T00:00:00+09:00
2016-12-08 20:16:04 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): td-client version: 0.7.26
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): Logging initialized @16349ms
2016-12-08 20:16:06 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:07 +0900 [INFO] (0017@+job1+task1): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf1";
CREATE TABLE "test_wf1" AS
SELECT
    created_by,
    count(distinct ip) ip,
    count(distinct user) uu,
    count(1) as logs
FROM
    test1
WHERE
    TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
    created_by

2016-12-08 20:16:09 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:13 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:17 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:26 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:28 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:29 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:31 +0900 [INFO] (0017@+job1+task2): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf2";
CREATE TABLE "test_wf2" AS
SELECT
    created_by,
    count(distinct ip) ip,
    count(distinct user) uu,
    count(1) as logs
FROM
    test2
WHERE
    TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
    created_by

2016-12-08 20:16:33 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:36 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
Success. Task state is saved at /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900 directory.

제대로 집계 된 데이터가 들어 있습니다.



TD workflow로 push



제대로 작동하는지 확인할 수 있으므로 정의한 작업을 푸시합니다.
$ td wf push wf_sample
2016-12-08 20:09:23 +0900: Digdag v0.8.22
Creating .digdag/tmp/archive-5532475972498044837.tar.gz...
  Archiving job1.dig
  Archiving queries/sum1.sql
  Archiving queries/sum2.sql
Workflows:
  job1

Uploaded:
  id: xxxxxxx
  name: wf_sample
  revision: 36b26035-872d-419e-bed8-ab665d5996d9
  archive type: db
  project created at: 2016-12-08T11:09:43Z
  revision updated at: 2016-12-08T11:09:43Z

업된 것 같습니다. 커맨드 라인에서도 확인할 수 있지만, UI도 있으므로 그곳에서 확인해 봅시다.



좋은 느낌이네요. 다음 번의 실행 시간도 있습니다.

TD workflow에서 실행



push에 성공했기 때문에 TD workflow (서버 측)에서 실행해 보겠습니다.
$ td wf start wf_sample job1 --session now 
2016-12-08 20:22:03 +0900: Digdag v0.8.22
Started a session attempt:
  session id: xxxxxx
  attempt id: xxxxxx
  uuid: 4608a620-b8c2-48f9-beb3-209f0b203d44
  project: wf_sample
  workflow: job1
  session time: 2016-12-08 20:22:15 +0900
  retry attempt name: 
  params: {"last_session_time":"2016-12-08T00:00:00+09:00","next_session_time":"2016-12-09T00:00:00+09:00"}
  created at: 2016-12-08 20:22:26 +0900

* Use `td workflow session xxxxxx` to show session status.
* Use `td workflow task xxxxxx` and `td workflow log 613218` to show task status and logs.

이쪽도 관리 화면에서 확인해 본다. Sessions에 한 행 추가되었으며 Status가 Success입니다.



상세 화면에서는 작업 하나 하나의 상황도 보입니다.



UI가 꽤 좋은 느낌입니다.
좀 더 사용해 보았습니다만 써 피곤했기 때문에 일단 여기서 종료.

digdag와는 달리, TD의 workflow의 경우 쉘등의 TD의 쿼리 이외는 지금까지 할 수 없지만, 그래도 TD내에서 완결하는 것도 있으므로, 약간의 것이라면 이것으로 충분한 생각이 듭니다.

좋은 웹페이지 즐겨찾기