TreasureData Workflow (digdag)를 사용해 보았다 (초입문) [참고]
9931 단어 TreasureDatadigdag
TreasureData Workflow(digdag)란?
TreasureData가 개발한 워크플로 엔진입니다.
자세한 내용은 다음 문서를.
htps //w w.ぢg다g. 이오/
htp // // cs.ぢg다g. 이오/
htps : // / cs. t 어째서였다. 이 m / r ch c ぇ s / rkf ぉ ws
시작하기 전에
개요는 물론 읽어 두는 것으로, project나 session, attempt 되는 것이 나오고, 처음에는 약간 혼란하므로, 이하를 읽어 두면 좋을 것 같다
여기에서 copipe입니다.
Projects and revisions
In Digdag, workflows are packaged together with other files used in the workflows. The files can be anything such as SQL scripts, Python/Ruby/Shell scripts, configuration files, etc. This set of the workflow definitions is called project.
알겠습니다.
Sessions and attempts
A session is a plan to run a workflow which should complete successfully. An attempt is an actual execution of a session. A session has multiple attempts if you retry a failed workflow.
session은 하나의 워크 플로우 실행 단위이고 session과 attempts는 1 : n의 관계이므로
The reason why sessions and attempts are separated is that an execution may fail.
실패할 가능성이 있으므로 나누고 있으면.
알겠습니다.
실천
기본적으로는 여기
workflow를 정의하는 dig 파일 만들기
이런 느낌. 내용은 일일(7:00)에 sum1.sql을 실행하고 결과를 test_wf1 테이블에 넣은 다음 sum2.sql을 실행하여 test_wf2에 넣습니다.
job1.digtimezone: Asia/Tokyo
schedule:
daily>: 07:00:00
_export:
td:
database: test
+task1:
td>: queries/sum1.sql
create_table: test_wf1
+task2:
td>: queries/sum2.sql
create_table: test_wf2
SQL
SQL은 이런 느낌입니다. test1과 test2라는 테이블에 같은 SQL을 던지고 있습니다.
sum1.sqlSELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
sum2.sqlSELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
디렉토리 구성
SQL은 queries 아래에 넣습니다.
wf_sample/
├── job1.dig
└── queries
├── sum1.sql
└── sum2.sql
로컬로 실행
TD의 workflow로 올리기 전에 로컬에서 실행해 봅시다.
$ td wf run job1
2016-12-08 20:15:49 +0900: Digdag v0.8.22
2016-12-08 20:16:02 +0900 [WARN] (main): Using a new session time 2016-12-08T00:00:00+09:00 based on schedule.
2016-12-08 20:16:02 +0900 [INFO] (main): Using session /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900.
2016-12-08 20:16:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job1 session_time=2016-12-08T00:00:00+09:00
2016-12-08 20:16:04 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): td-client version: 0.7.26
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): Logging initialized @16349ms
2016-12-08 20:16:06 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:07 +0900 [INFO] (0017@+job1+task1): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf1";
CREATE TABLE "test_wf1" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:09 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:13 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:17 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:26 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:28 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:29 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:31 +0900 [INFO] (0017@+job1+task2): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf2";
CREATE TABLE "test_wf2" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:33 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:36 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
Success. Task state is saved at /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900 directory.
제대로 집계 된 데이터가 들어 있습니다.
TD workflow로 push
제대로 작동하는지 확인할 수 있으므로 정의한 작업을 푸시합니다.
$ td wf push wf_sample
2016-12-08 20:09:23 +0900: Digdag v0.8.22
Creating .digdag/tmp/archive-5532475972498044837.tar.gz...
Archiving job1.dig
Archiving queries/sum1.sql
Archiving queries/sum2.sql
Workflows:
job1
Uploaded:
id: xxxxxxx
name: wf_sample
revision: 36b26035-872d-419e-bed8-ab665d5996d9
archive type: db
project created at: 2016-12-08T11:09:43Z
revision updated at: 2016-12-08T11:09:43Z
업된 것 같습니다. 커맨드 라인에서도 확인할 수 있지만, UI도 있으므로 그곳에서 확인해 봅시다.
좋은 느낌이네요. 다음 번의 실행 시간도 있습니다.
TD workflow에서 실행
push에 성공했기 때문에 TD workflow (서버 측)에서 실행해 보겠습니다.
$ td wf start wf_sample job1 --session now
2016-12-08 20:22:03 +0900: Digdag v0.8.22
Started a session attempt:
session id: xxxxxx
attempt id: xxxxxx
uuid: 4608a620-b8c2-48f9-beb3-209f0b203d44
project: wf_sample
workflow: job1
session time: 2016-12-08 20:22:15 +0900
retry attempt name:
params: {"last_session_time":"2016-12-08T00:00:00+09:00","next_session_time":"2016-12-09T00:00:00+09:00"}
created at: 2016-12-08 20:22:26 +0900
* Use `td workflow session xxxxxx` to show session status.
* Use `td workflow task xxxxxx` and `td workflow log 613218` to show task status and logs.
이쪽도 관리 화면에서 확인해 본다. Sessions에 한 행 추가되었으며 Status가 Success입니다.
상세 화면에서는 작업 하나 하나의 상황도 보입니다.
UI가 꽤 좋은 느낌입니다.
좀 더 사용해 보았습니다만 써 피곤했기 때문에 일단 여기서 종료.
digdag와는 달리, TD의 workflow의 경우 쉘등의 TD의 쿼리 이외는 지금까지 할 수 없지만, 그래도 TD내에서 완결하는 것도 있으므로, 약간의 것이라면 이것으로 충분한 생각이 듭니다.
Reference
이 문제에 관하여(TreasureData Workflow (digdag)를 사용해 보았다 (초입문) [참고]), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/skryoooo/items/7363e9cf0910f21eb9d9
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
개요는 물론 읽어 두는 것으로, project나 session, attempt 되는 것이 나오고, 처음에는 약간 혼란하므로, 이하를 읽어 두면 좋을 것 같다
여기에서 copipe입니다.
Projects and revisions
In Digdag, workflows are packaged together with other files used in the workflows. The files can be anything such as SQL scripts, Python/Ruby/Shell scripts, configuration files, etc. This set of the workflow definitions is called project.
알겠습니다.
Sessions and attempts
A session is a plan to run a workflow which should complete successfully. An attempt is an actual execution of a session. A session has multiple attempts if you retry a failed workflow.
session은 하나의 워크 플로우 실행 단위이고 session과 attempts는 1 : n의 관계이므로
The reason why sessions and attempts are separated is that an execution may fail.
실패할 가능성이 있으므로 나누고 있으면.
알겠습니다.
실천
기본적으로는 여기
workflow를 정의하는 dig 파일 만들기
이런 느낌. 내용은 일일(7:00)에 sum1.sql을 실행하고 결과를 test_wf1 테이블에 넣은 다음 sum2.sql을 실행하여 test_wf2에 넣습니다.
job1.digtimezone: Asia/Tokyo
schedule:
daily>: 07:00:00
_export:
td:
database: test
+task1:
td>: queries/sum1.sql
create_table: test_wf1
+task2:
td>: queries/sum2.sql
create_table: test_wf2
SQL
SQL은 이런 느낌입니다. test1과 test2라는 테이블에 같은 SQL을 던지고 있습니다.
sum1.sqlSELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
sum2.sqlSELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
디렉토리 구성
SQL은 queries 아래에 넣습니다.
wf_sample/
├── job1.dig
└── queries
├── sum1.sql
└── sum2.sql
로컬로 실행
TD의 workflow로 올리기 전에 로컬에서 실행해 봅시다.
$ td wf run job1
2016-12-08 20:15:49 +0900: Digdag v0.8.22
2016-12-08 20:16:02 +0900 [WARN] (main): Using a new session time 2016-12-08T00:00:00+09:00 based on schedule.
2016-12-08 20:16:02 +0900 [INFO] (main): Using session /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900.
2016-12-08 20:16:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job1 session_time=2016-12-08T00:00:00+09:00
2016-12-08 20:16:04 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): td-client version: 0.7.26
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): Logging initialized @16349ms
2016-12-08 20:16:06 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:07 +0900 [INFO] (0017@+job1+task1): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf1";
CREATE TABLE "test_wf1" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:09 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:13 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:17 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:26 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:28 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:29 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:31 +0900 [INFO] (0017@+job1+task2): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf2";
CREATE TABLE "test_wf2" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:33 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:36 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
Success. Task state is saved at /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900 directory.
제대로 집계 된 데이터가 들어 있습니다.
TD workflow로 push
제대로 작동하는지 확인할 수 있으므로 정의한 작업을 푸시합니다.
$ td wf push wf_sample
2016-12-08 20:09:23 +0900: Digdag v0.8.22
Creating .digdag/tmp/archive-5532475972498044837.tar.gz...
Archiving job1.dig
Archiving queries/sum1.sql
Archiving queries/sum2.sql
Workflows:
job1
Uploaded:
id: xxxxxxx
name: wf_sample
revision: 36b26035-872d-419e-bed8-ab665d5996d9
archive type: db
project created at: 2016-12-08T11:09:43Z
revision updated at: 2016-12-08T11:09:43Z
업된 것 같습니다. 커맨드 라인에서도 확인할 수 있지만, UI도 있으므로 그곳에서 확인해 봅시다.
좋은 느낌이네요. 다음 번의 실행 시간도 있습니다.
TD workflow에서 실행
push에 성공했기 때문에 TD workflow (서버 측)에서 실행해 보겠습니다.
$ td wf start wf_sample job1 --session now
2016-12-08 20:22:03 +0900: Digdag v0.8.22
Started a session attempt:
session id: xxxxxx
attempt id: xxxxxx
uuid: 4608a620-b8c2-48f9-beb3-209f0b203d44
project: wf_sample
workflow: job1
session time: 2016-12-08 20:22:15 +0900
retry attempt name:
params: {"last_session_time":"2016-12-08T00:00:00+09:00","next_session_time":"2016-12-09T00:00:00+09:00"}
created at: 2016-12-08 20:22:26 +0900
* Use `td workflow session xxxxxx` to show session status.
* Use `td workflow task xxxxxx` and `td workflow log 613218` to show task status and logs.
이쪽도 관리 화면에서 확인해 본다. Sessions에 한 행 추가되었으며 Status가 Success입니다.
상세 화면에서는 작업 하나 하나의 상황도 보입니다.
UI가 꽤 좋은 느낌입니다.
좀 더 사용해 보았습니다만 써 피곤했기 때문에 일단 여기서 종료.
digdag와는 달리, TD의 workflow의 경우 쉘등의 TD의 쿼리 이외는 지금까지 할 수 없지만, 그래도 TD내에서 완결하는 것도 있으므로, 약간의 것이라면 이것으로 충분한 생각이 듭니다.
Reference
이 문제에 관하여(TreasureData Workflow (digdag)를 사용해 보았다 (초입문) [참고]), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/skryoooo/items/7363e9cf0910f21eb9d9
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
timezone: Asia/Tokyo
schedule:
daily>: 07:00:00
_export:
td:
database: test
+task1:
td>: queries/sum1.sql
create_table: test_wf1
+task2:
td>: queries/sum2.sql
create_table: test_wf2
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
wf_sample/
├── job1.dig
└── queries
├── sum1.sql
└── sum2.sql
$ td wf run job1
2016-12-08 20:15:49 +0900: Digdag v0.8.22
2016-12-08 20:16:02 +0900 [WARN] (main): Using a new session time 2016-12-08T00:00:00+09:00 based on schedule.
2016-12-08 20:16:02 +0900 [INFO] (main): Using session /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900.
2016-12-08 20:16:02 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job1 session_time=2016-12-08T00:00:00+09:00
2016-12-08 20:16:04 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): td-client version: 0.7.26
2016-12-08 20:16:05 +0900 [INFO] (0017@+job1+task1): Logging initialized @16349ms
2016-12-08 20:16:06 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:07 +0900 [INFO] (0017@+job1+task1): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf1";
CREATE TABLE "test_wf1" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test1
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:09 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:13 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:17 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:26 +0900 [INFO] (0017@+job1+task1): td>: queries/sum1.sql
2016-12-08 20:16:28 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:29 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:31 +0900 [INFO] (0017@+job1+task2): Started presto job id=xxxxxxx:
DROP TABLE IF EXISTS "test_wf2";
CREATE TABLE "test_wf2" AS
SELECT
created_by,
count(distinct ip) ip,
count(distinct user) uu,
count(1) as logs
FROM
test2
WHERE
TD_TIME_RANGE(time, '2016-12-08 00:00:00','2016-12-08 01:00:00')
GROUP BY
created_by
2016-12-08 20:16:33 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
2016-12-08 20:16:36 +0900 [INFO] (0017@+job1+task2): td>: queries/sum2.sql
Success. Task state is saved at /xxx/xxx/digdag/wf_sample/.digdag/status/20161208T000000+0900 directory.
$ td wf push wf_sample
2016-12-08 20:09:23 +0900: Digdag v0.8.22
Creating .digdag/tmp/archive-5532475972498044837.tar.gz...
Archiving job1.dig
Archiving queries/sum1.sql
Archiving queries/sum2.sql
Workflows:
job1
Uploaded:
id: xxxxxxx
name: wf_sample
revision: 36b26035-872d-419e-bed8-ab665d5996d9
archive type: db
project created at: 2016-12-08T11:09:43Z
revision updated at: 2016-12-08T11:09:43Z
$ td wf start wf_sample job1 --session now
2016-12-08 20:22:03 +0900: Digdag v0.8.22
Started a session attempt:
session id: xxxxxx
attempt id: xxxxxx
uuid: 4608a620-b8c2-48f9-beb3-209f0b203d44
project: wf_sample
workflow: job1
session time: 2016-12-08 20:22:15 +0900
retry attempt name:
params: {"last_session_time":"2016-12-08T00:00:00+09:00","next_session_time":"2016-12-09T00:00:00+09:00"}
created at: 2016-12-08 20:22:26 +0900
* Use `td workflow session xxxxxx` to show session status.
* Use `td workflow task xxxxxx` and `td workflow log 613218` to show task status and logs.
Reference
이 문제에 관하여(TreasureData Workflow (digdag)를 사용해 보았다 (초입문) [참고]), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/skryoooo/items/7363e9cf0910f21eb9d9텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)