파이썬과 Embulk로 여러 데이터 소스를 가로 지르는 소나기 ETL 만들기

최근, TreasureData와 자사의 DB의 데이터를 맞춘 집계를 배치 처리하는 것이 많아서 python으로 일단 csv에 내보내 Embulk로 업로드하고 있었습니다만, 데이터 소스가 늘어나면 귀찮아져 가기 때문에 보다 간단하고 범용화한 쓰는 방법을 생각했습니다.
결국은 WF 사용해 주면 좋은 이야기이므로 수요가 있을지 불명합니다만, WF 공부하는 것도 시간이 걸리므로 익숙한 언어로 촉촉하게 빨리 ETL을 만들고 싶은 사람을 위한 내용입니다.

전제 조건



이 기사가 적합한 사람


  • 평소에는 파이썬을 쓰고 있기 때문에 가능한 한 그 범위에서 ETL하고 싶습니다
  • pandas-td의 움직임이 너무 불안정하여 사용할 수 없습니다
  • 쉘에서의 실현 방법을 조사하는 시간도 없다

  • 달성하고 싶은 기능


  • SQL조차 만들면 배치를 추가 할 수있는 ETL 상자를 만듭니다
  • I/O 테이블이 변경 되더라도 번거로움이 없습니다
  • 기술 된 순서대로 처리가 실행되어 진행 상황이 통지됩니다.

    환경 준비



    Embulk 설치



    이미 존재하는 Tips에 자세히 설명되어 있습니다.
    Embulk용 플러그인 설치 input용과 output용으로 플러그인이 나뉘어져 있습니다. 설치는 다음과 같은 명령으로 간단합니다. $ embulk gem install embulk-input-td 이번에는 PostgreSQL에서 input하고 td에 output하고 싶으므로 embbulk-output-postgresql을 넣었습니다. 여기에서 플러그인을 찾을 수 있습니다. 소나기 ETL 구현 캡처 설정 yaml(liquid) 준비 변수에서 나중에 값을 변경할 수 있도록 하는 캡처 설정 파일을 만듭니다. 플러그인 페이지에 Example이 있으므로 기본 거기를 복사하여 자신의 환경에 맞춥니다. load_sample.yml.liquid in: type: postgresql host: 0.0.0.0 user: user_name password: pass database: db_name query: | {{ env.sql }} filters: - type: add_time to_column: {name: time, type: timestamp} from_value: {mode: upload_time} out: {type: td, apikey: xxxxxxxxx, endpoint: api.treasuredata.com, database: {{ env.o_db }}, table: {{ env.o_tbl }}, time_column: time, mode: append, default_timestamp_format: '%Y-%m-%d %H:%M:%S'} 동적 변수 설정 {{env.xxxx}}는 동적 값이 되며 환경 변수에서 참조됩니다. 나중에 파이썬에서 환경 변수를 설정하고 Embulk를 호출합니다. 여기에서는 호스트나 ID·PW는 고정하고 있습니다만, 동적으로 해도 좋고, 고정의 파라미터에서도 로그인 정보계는 .bashrc인가.bash_profile에 변수 등록해 두면 python으로부터 건네주지 않아도 자동 로 읽혀지기 때문에 yaml을 복수 만들 때의 메인터넌스성 및 시큐리티 향상으로 연결됩니다. 스키마 설정 정보 제 경우에는 output이 TreasureData이므로 스키마 설정이 없지만, 일반 DB에 낼 때는 out:파트에 schema를 설정하지 않으면 마음대로 public으로 설정되어 테이블을 찾을 수 없는 에러가 되므로 주의. ETL용 파이썬 준비 범용 부분 만들기 etl_batch.py import os def td2pg(o_db, o_tbl, sql): os.system('export o_db=' + o_db + ';export o_tbl=' + o_tbl + '; export sql="'+ sql.replace("\n", " ") + '"; embulk run /path_to_liquid_file/ load_sample.yml.liquid') yaml에서 설정할 예정인 동적 변수를 모두 추가하십시오. 여기까지 오면 나머지는 SQL을 만들어 설정한 함수를 부를 뿐입니다. ETL 처리 설명 SQL과 출력처 DB등의 동적 변수를 지정해 함수를 부르는 처리를 써 갈 뿐입니다. 어딘가에서 오류가 발생한 경우 진행 상황을 인쇄하십시오. etl_batch.py #task1 sql=""" select * from tbl """ o_db="output_db_name" o_tbl="output_tbl_name" td2pg(o_db, o_tbl, sql) print("task1 is done.") cron 등록(배치 가동시키고 싶은 인용) 마지막으로 cron으로 만든 파이썬을 정기 실행하는 설정을합니다. 쿨롱 (cron)을 만져보십시오.

    아래와 같이 메일 송신 설정을 해 두면, 실행시에 Embulk의 로그와 print한 내용이 보내져 오므로 에러시에 어디까지 진행했는지 쫓을 수 있습니다.
    MAILTO="[email protected]"
    30 15 * * * /path_to_python_file/etl_batch.py
    

    이 경우 매일 15시 30 분에 ETL 배치가 실행됩니다.

    yaml에서 .bashrc 환경 변수를 호출하는 경우



    .bashrc에 등록한 변수를 호출하는 경우 python을 실행하기 전에 변수를 활성화하는 설명이 필요합니다. cron의 등록 유저가 스스로라도 .bashrc 뿐이라면 에러가 되어, 풀 패스로 지정할 필요가 있으므로 주의.
    MAILTO="[email protected]"
    30 15 * * * source /user_home/.bashrc; /path_to_python_file/etl_batch.py
    

    마지막으로



    나는 SQL 내에서 동적으로 날짜를 지정하고 싶었기 때문에 파이썬에서 datetime도 import하여 날짜를 생성한다. 쉘에서도 할 수 있다고 생각합니다만 익숙한 언어 쪽이 빠르고, 도중에 python의 데이터 가공을 끼울 수도 있습니다.
    그리고는 큰 데이터에서도 Embulk가 마음대로 분산 처리해 주기 때문에 메모리 오버의 걱정도 없습니다.
    ...하지만 python 사용 이외에는 누가 얻은 내용이므로 누군가의 도움이되면 다행입니다.

    이상
  • 좋은 웹페이지 즐겨찾기