Digdag에서 ruby/python간에 변수 전달

6271 단어 Embulkdigdag
TreasureData의 새로운 OSS Digdag은 최근 공용 리포지토리가되었습니다.

이번에 게임서버 공부회 에서 초보자용 데모라든지 하려고 했으므로, 간단한 샘플을 소개합니다.

Digdag는 Operator를 이용하는 것으로 다양한 처리를 간단하게 부를 수 있게 되어 있습니다.
h tp // w w.ぢg다g. 이오 / 오페라와 rs. HTML

아직, TreasureData용의 것이 아직 많습니다만, 이 Operator가 늘어나면, 분석 엔진과의 제휴가 용이해질까 생각합니다. 만드는 방법은 그중 문서화되는 것이 아닐까 생각합니다.

그런데, 이번 소개에서는 CSV 파일을 Embulk로 PostgreSQL에 로드하고, Ruby로 집계하고, 그 결과를 Python에 건네주어, Slack 통지하는 워크플로우입니다.



워크플로 내용



아래는 전체 워크플로입니다.
_export 에서는 전체 워크플로에 대한 매개 변수를 설정합니다. Ruby 노스크립트를 사용하는 경우에는 여기에서 require 할 필요가 있는 것 같습니다.
+~ 의 ~ 부분은 임의의 이름을 부여할 수 있습니다. 그 아래에 오퍼레이터로서 embulk나 ruby나 python을 이용하고 있습니다.

mydag.dig
timezone: UTC

_export:
  rb:
    require: 'tasks/myworkflow'

+dataload:
  embulk>: demo/config.yml

+pg_calc:
  rb>: MyWorkflow.pg_calc

+slack:
  py>: tasks.MyWorkflow.slack

Ruby에서는 스크립트에서 MyWorkflow라는 클래스에 pg_calc라는 메서드를 정의합니다.
또 거기에서는 로컬의 PostgreSQL에 대해서 쿼리를 던져, 그 결과를 Digdag.env.store 에 대해서 값을 대입하고 있습니다.Digdag.env.store 를 사용하면 Diggag의 전체 워크플로에 변수를 전달할 수 있습니다.

myworkflow.rb
class MyWorkflow
  require "pg"

  def pg_calc
    conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
    q    = "select count(1) as cnt from test"
    begin
      result  = conn.exec(q)
      Digdag.env.store(query_result: result[0]['cnt'])
    ensure
      conn.finish
    end
  end
end

Python 스크립트는 slack이라는 메소드의 인수에 Ruby에서 사용한 query_result를 인수로 전달합니다.
이렇게하면 Ruby에서 Python으로 변수를 전달하고 Slack에 COUNT 한 값을 알립니다.

__init__.py
class MyWorkflow(object):
    def __init__(self):
        pass

    def slack(self, session_time = None, query_result='0'):
        import requests
        import json
        requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
          'text': "レコード件数は{result}件です".format(result=query_result),
          'username': u'digdag',
          'icon_emoji': u':ghost:',
          'link_names': 1,
        }))


워크플로우 실행



설정한 워크플로우를 바탕으로 다음을 실행하면 처리가 시작되고 결국 Slack에 통지됩니다.
$digdag run mydag.dig



또한 Digdag에서 한 번 성공한 작업의 경우 digdag run을 수행해도 처리가 건너 뜁니다 (session_time을 설정한 경우는 다름).
따라서 다시 실행할 때는 digdag run mydag.dig --rerun 를 수행합니다.

간단하지만 변수를 전달하는 방법을 설명했습니다.
자세한 내용은 아래를 참조하십시오.
h tp // w w.ぢg다g. 이오/루 by_아피. HTML

좋은 웹페이지 즐겨찾기