Digdag에서 ruby/python간에 변수 전달
이번에 게임서버 공부회 에서 초보자용 데모라든지 하려고 했으므로, 간단한 샘플을 소개합니다.
Digdag는 Operator를 이용하는 것으로 다양한 처리를 간단하게 부를 수 있게 되어 있습니다.
h tp // w w.ぢg다g. 이오 / 오페라와 rs. HTML
아직, TreasureData용의 것이 아직 많습니다만, 이 Operator가 늘어나면, 분석 엔진과의 제휴가 용이해질까 생각합니다. 만드는 방법은 그중 문서화되는 것이 아닐까 생각합니다.
그런데, 이번 소개에서는 CSV 파일을 Embulk로 PostgreSQL에 로드하고, Ruby로 집계하고, 그 결과를 Python에 건네주어, Slack 통지하는 워크플로우입니다.
워크플로 내용
아래는 전체 워크플로입니다.
_export
에서는 전체 워크플로에 대한 매개 변수를 설정합니다. Ruby 노스크립트를 사용하는 경우에는 여기에서 require 할 필요가 있는 것 같습니다.
+~
의 ~ 부분은 임의의 이름을 부여할 수 있습니다. 그 아래에 오퍼레이터로서 embulk나 ruby나 python을 이용하고 있습니다.
mydag.digtimezone: UTC
_export:
rb:
require: 'tasks/myworkflow'
+dataload:
embulk>: demo/config.yml
+pg_calc:
rb>: MyWorkflow.pg_calc
+slack:
py>: tasks.MyWorkflow.slack
Ruby에서는 스크립트에서 MyWorkflow라는 클래스에 pg_calc라는 메서드를 정의합니다.
또 거기에서는 로컬의 PostgreSQL에 대해서 쿼리를 던져, 그 결과를 Digdag.env.store
에 대해서 값을 대입하고 있습니다.Digdag.env.store
를 사용하면 Diggag의 전체 워크플로에 변수를 전달할 수 있습니다.
myworkflow.rbclass MyWorkflow
require "pg"
def pg_calc
conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
q = "select count(1) as cnt from test"
begin
result = conn.exec(q)
Digdag.env.store(query_result: result[0]['cnt'])
ensure
conn.finish
end
end
end
Python 스크립트는 slack이라는 메소드의 인수에 Ruby에서 사용한 query_result
를 인수로 전달합니다.
이렇게하면 Ruby에서 Python으로 변수를 전달하고 Slack에 COUNT 한 값을 알립니다.
__init__.pyclass MyWorkflow(object):
def __init__(self):
pass
def slack(self, session_time = None, query_result='0'):
import requests
import json
requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
'text': "レコード件数は{result}件です".format(result=query_result),
'username': u'digdag',
'icon_emoji': u':ghost:',
'link_names': 1,
}))
워크플로우 실행
설정한 워크플로우를 바탕으로 다음을 실행하면 처리가 시작되고 결국 Slack에 통지됩니다.
$digdag run mydag.dig
또한 Digdag에서 한 번 성공한 작업의 경우 digdag run을 수행해도 처리가 건너 뜁니다 (session_time을 설정한 경우는 다름).
따라서 다시 실행할 때는 digdag run mydag.dig --rerun
를 수행합니다.
간단하지만 변수를 전달하는 방법을 설명했습니다.
자세한 내용은 아래를 참조하십시오.
h tp // w w.ぢg다g. 이오/루 by_아피. HTML
Reference
이 문제에 관하여(Digdag에서 ruby/python간에 변수 전달), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/toru-takahashi/items/418eb5a4dda4e8555a00
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
timezone: UTC
_export:
rb:
require: 'tasks/myworkflow'
+dataload:
embulk>: demo/config.yml
+pg_calc:
rb>: MyWorkflow.pg_calc
+slack:
py>: tasks.MyWorkflow.slack
class MyWorkflow
require "pg"
def pg_calc
conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
q = "select count(1) as cnt from test"
begin
result = conn.exec(q)
Digdag.env.store(query_result: result[0]['cnt'])
ensure
conn.finish
end
end
end
class MyWorkflow(object):
def __init__(self):
pass
def slack(self, session_time = None, query_result='0'):
import requests
import json
requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
'text': "レコード件数は{result}件です".format(result=query_result),
'username': u'digdag',
'icon_emoji': u':ghost:',
'link_names': 1,
}))
설정한 워크플로우를 바탕으로 다음을 실행하면 처리가 시작되고 결국 Slack에 통지됩니다.
$digdag run mydag.dig
또한 Digdag에서 한 번 성공한 작업의 경우 digdag run을 수행해도 처리가 건너 뜁니다 (session_time을 설정한 경우는 다름).
따라서 다시 실행할 때는
digdag run mydag.dig --rerun
를 수행합니다.간단하지만 변수를 전달하는 방법을 설명했습니다.
자세한 내용은 아래를 참조하십시오.
h tp // w w.ぢg다g. 이오/루 by_아피. HTML
Reference
이 문제에 관하여(Digdag에서 ruby/python간에 변수 전달), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/toru-takahashi/items/418eb5a4dda4e8555a00텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)