Dataform에서 BigQuery 데이터 파이프라인 구축

이 기사는 BigQuery Advent Calendar 2020 24일째 보도다.
https://qiita.com/advent-calendar/2020/bigquery
BigQuery의 COVID-19 일반 공개 데이터 세트를 바탕으로 Google Cloud 패밀리의 일원인 Dataform에서 BigQuery의 데이터 파이프라인을 조합해 보았습니다.

Dataform은


https://dataform.co/
Dataform은 기존 ETL 파이프라인이 아니라 ELT 파이프라인에 유용한 도구입니다.
ELT 파이프라인은 데이터 소스에서 추출(Extract)한 원본 데이터를 데이터 소프트웨어 하우스(DWH)(Load)로 직접 읽고 DWH 내에서 목적에 맞는 테이블 구조(Transform)로 변환하는 일련의 프로세스를 말합니다.Dataform은 ELT 파이프라인에서 Transform을 특화하는 도구입니다.

구글클라우드 산하


구글클라우드는 2020년 12월 8일(미국시간) 데이터폼브로드캐스트을 인수했다.동시에 모든 사용자는 데이터 폼을 무료로 사용할 수 있다.
Google Cloud에서 Dataform을 사용할 수 있어서 기쁩니다.우리는 전체 조직의 모든 사용자가 정보를 분석할 수 있도록 하는 사명을 계속 집행할 것이다.이번에는 모든 이용자에게 Dataform 서비스를 무료로 제공하기로 했다.앞으로 Dataform과 BigQuery의 우수한 기능을 조합하여 사용할 수 있기를 기대합니다.

실제로 시험해 보다


이번에 손수건으로 만든 Dataform의 소스 코드가 여기 있습니다.
https://github.com/OTA2000/dataform_advent_calendar

전제 조건

  • 소스COVID-19 Public Datasets로 사용(일반 공개 데이터)
  • 대부분의 용례는 빅Query에 원본 데이터를 저장하는 파이프라인을 미리 설정해야 한다(이번 생략)
  • 이번 일

  • Dataform 프로젝트 시작
  • 소스 데이터를 정의하는 SQLX 만들기
  • 뷰를 정의하는 SQLX 생성
  • 파티션 테이블을 정의하는 SQLX 만들기
  • 일반 테이블을 정의하는 SQLX 만들기
  • 표별 의존관계 확인
  • 일정을 정하고 제때에 집행
  • Dataform 프로젝트 시작


    [GCC] 서비스 계정 만들기


    대상 GCP 프로젝트에서 서비스 계정을 만들고 BigQuery 관리자 역할을 부여합니다.


    서비스 계정을 만들면 JSON의 비밀 키가 다운로드되어 보관됩니다.
    ※ Dataform에서 발행된 BigQuery의 의뢰는 모두 해당 서비스 계정에서 수행됩니다.

    사용자 계정 만들기


    다음 링크에서 데이터form을 만드는 사용자 계정
    https://app.dataform.co/

    [Dataform] 프로젝트 만들기


    서명 완료 후새 프로젝트 작성.
  • 항목 이름 입력

  • 연결 객체의 GCP 프로젝트 ID를 입력합니다.

  • 데이터 세트를 만들 위치와 서비스 계정 키를 지정하여 BigQuery 연결

  • (선택 가능) 모든 Giit 공급자가 버전 관리


    Dataform에 생성된 소스 코드는 Giit에서 버전 관리를 수행합니다.Giit 관리 제공자를 임의(GiitHub, GiitLab, Azure DevOps)로 이전할 수도 있다.
    Giit 공급자를 이전하려면 왼쪽 창의 프로젝트 settings > Version control에서 설정하십시오. (빈 창고와 방문 영패를 따로 준비해야 합니다.)

    이른바 SQLX


    Dataform에서 SQLX 형식으로 테이블과 뷰를 정의합니다.먼저 SQLX의 개요와 표기법을 이해하십시오.
    https://docs.dataform.co/introduction/dataform-in-5-minutes
    https://docs.dataform.co/guides/sqlx

    소스 데이터를 정의하는 SQLX 만들기


    SQLX를 사용하여 다음 테이블을 소스 데이터로 정의합니다.
  • bigquery-public-data.covid19_open_data.covid19_open_data
  • bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d
  • 예: bigquery-public-data.covid19_open_data.covid19_open_data를 소스 데이터로 정의하는 SQLX
    definitions/declaration/covid19_open_data.sqlx
    config {
      type: "declaration",
      database: "bigquery-public-data",
      schema: "covid19_open_data",
      name: "covid19_open_data"
    }
    
    config 블록의 type에'declation'을 지정하여 읽기 원본의 프로젝트 ID(database), 데이터 집합 이름(schema), 표 이름(name)을 정의합니다.이 설명을 통해 테이블이나 보기를 만드는 SQLX에서 이 테이블을 호출하거나 데이터 form의 의존 관계를 설명할 수 있습니다.

    뷰를 정의하는 SQLX 작성


    SQLX에서는 본도부현에 따라 신규 감염자와 신규 사망자를 산정하는 뷰를 정의해 빅큐리에 창설했다.
    definitions/japan_actual.sqlx
    config {
      type: "view",
      schema: "covid19",
      name: "japan_actual",
      description: "都道府県別感染状況(日別)",
      columns: {
        date: "日",
        prefecture_code: "都道府県コード",
        new_confirmed: "新規感染者数",
        new_deaths: "新規死者数"
      }
    }
    
    # standardSQL
    SELECT
      date,
      REPLACE(location_key, "_", "-") AS prefecture_code,
      SUM(new_confirmed) AS new_confirmed,
      SUM(new_deceased) AS new_deaths
    FROM
      ${ref("covid19_open_data")}
    WHERE
      country_code = "JP"
      AND location_key <> "JP"
    GROUP BY
      date, prefecture_code
    
    
    config 블록type에'view'를 지정하여 보기의 설명(description)과 열의 설명(columns)을 정의합니다.SQL 블록에는 뷰에서 사용되는 표준 SQL이 설명되어 있습니다.
    FROM 문에서 방금 발표된 소스 데이터를 ${ref("covid19_open_data")} 형식으로 호출합니다.이 형식으로 표 인용을 기술함으로써 데이터form의 의존 관계를 해석할 수 있다.

    SQLX를 작성하면 오른쪽 창에 뷰의 작성 위치나 종속성bigquery-public-data.covid19_open_data.covid19_open_data, 컴파일된 질의${ref("covid19_open_data")} 내용 등이 표시됩니다.PREVIEW RESULTS에서 조회 결과를 확인하고(실제로 BigQuery에서 조회 작업을 발행할 때 요금 등을 주의해야 함) CREATE VIEW에서 보기를 만들 수 있습니다.

    파티션 테이블을 정의하는 SQLX 만들기

    bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d부터 분구표에 각 도도부현의 감염 상황 예측(일별)을 작성하는 SQLX를 창설한다.
    definitions/japan_forecast.sqlx
    config {
      type: "incremental",
      schema: "covid19",
      name: "japan_forecast",
      description: "都道府県別感染状況予測(日別)",
      columns: {
        prefecture_code: "都道府県コード",
        prefecture_name_kanji: "都道府県名",
        forecast_date: "予測実施日",
        prediction_date: "予測対象日",
        new_confirmed: "新規感染者数予測",
        new_deaths: "新規死者数予測"
      },
      bigquery: {
        partitionBy: "prediction_date"
      },
      tags: ["daily"]
    }
    
    # standardSQL
    SELECT
      prefecture_code,
      prefecture_name_kanji,
      forecast_date,  -- 予測実施日
      prediction_date,  -- 予測対象日
      new_confirmed,
      new_deaths,
    FROM
      ${ref("japan_prefecture_28d")}
    ${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }
    
    
    config 블록type에서 "increamental"을 지정하면 섹션 테이블을 만들 수 있습니다.구역에 대한 세부 사항bigquery에서 지정partitionBy 등.
    처음 또는 전체 새로 고침을 수행할 때 다음 질의 작업이 BigQuery에서 릴리즈되고 새 테이블이 작성되도록 지정합니다.
    create or replace table `dataform-advent-calendar.covid19.japan_forecast` partition by prediction_date as
    
    # standardSQL
    SELECT
      prefecture_code,
      prefecture_name_kanji,
      forecast_date,  -- 予測実施日
      prediction_date,  -- 予測対象日
      new_confirmed,
      new_deaths,
    FROM
      `bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`
    
    
    두 번째 이후에 실행할 때 다음과 같은 조회 작업을 발행하여 INSERT를 진행한다.
    insert into `dataform-advent-calendar.covid19.japan_forecast`
    (prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths)
    select prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths
    from (
    
    # standardSQL
    SELECT
      prefecture_code,
      prefecture_name_kanji,
      forecast_date,  -- 予測実施日
      prediction_date,  -- 予測対象日
      new_confirmed,
      new_deaths,
    FROM
      `bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`
    WHERE forecast_date > (SELECT MAX(forecast_date) FROM `dataform-advent-calendar.covid19.japan_forecast`)) as insertions
    
    
    INSERT의 경우 SQL 블록의 다음 섹션이 평가됩니다.
    ${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }
    
    increamental(INSERT)에서 WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})를 검색에 추가합니다.${self()} 정의된 섹션 테이블 자체의 테이블 지정을 포함합니다.

    일반 테이블을 정의하는 SQLX 만들기


    실제 값 보기와 예측 값 구분표를 정의하고 이 표와 결합하여 예측 값과 실제 값을 비교하는 표를 만듭니다.
    definitions/japan_forecast_comparison.sqlx
    config {
      type: "table",
      schema: "covid19",
      name: "japan_forecast_comparison",
      description: "予測値評価(日別)",
      columns: {
        date: "日",
        prefecture_name_kanji: "都道府県名",
        forecast_new_confirmed: "新規感染者数(予測値)",
        new_confirmed: "新規感染者数",
        forecast_new_deaths: "新規死者数(予測値)",
        new_deaths: "新規死者数",
        latest_forecast_date: "最新の予測実施日"
      },
      tags: ["daily"]
    }
    
    # standardSQL
    WITH t AS (
      SELECT
        f.prefecture_name_kanji,
        f.prediction_date AS date,
        f.forecast_date,
        MAX (f.forecast_date) OVER (
          PARTITION BY f.prefecture_code,
          f.prediction_date
        ) AS latest_forecast_date,
        j.new_confirmed,
        f.new_confirmed AS forecast_new_confirmed,
        j.new_deaths,
        f.new_deaths AS forecast_new_deaths
      FROM
        ${ref("japan_forecast")} f
        LEFT JOIN ${ref("japan_actual")} j ON f.prediction_date = j.date
        AND f.prefecture_code = j.prefecture_code
      WHERE
        f.prediction_date >= "2020-12-21"
    )
    SELECT
      date,
      prefecture_name_kanji,
      new_confirmed,
      forecast_new_confirmed,
      new_deaths,
      forecast_new_deaths,
      latest_forecast_date
    FROM
      t
    WHERE
      forecast_date = latest_forecast_date
    
    

    각 표의 의존 관계를 확인하다


    햄버거 메뉴에서 Dependency Tree를 열면 정의된 테이블과 뷰의 종속성을 확인할 수 있습니다.

    이를 통해 알 수 있듯이 모든 원본 표는 실제 값 보기와 예측 값의 구분표를 만들고 예측 비교 표에 연결된다.

    일정을 짜다


    매일 아침 9시(JST)에 전망치의 파티션 테이블과 예측 비교 테이블을 업데이트하고 싶어 일정을 짰다.
    파일 일람environments.json에서 열면 다음 화면이 표시됩니다(View as plaain text 각도를 사용하면 json을 직접 편집할 수도 있습니다).

    CREATE NEW SCHEDULE은 객체 레이블이나 실행 시 종속성 등을 지정합니다.
  • 스케줄링은cron 형식으로 표시
  • Tags to run에 수행 시간 지정 객체의 레이블 이름을 입력합니다.
  • 자판 미리 만들기forecast.qlx 및 japanforecast_comparison.sqlx의config에서 지정tags: ["daily"]
  • 실행 시 테이블 새로 고침 여부 선택
  • 종속성 포함 여부 선택
  • (메일이나 슬랙의 알림 설정을 미리 만든 경우) 어느 채널로 알림을 보낼지 선택
  • 성공만 실패만 모두 세 가지 모드 중에서 선택할 수 있습니다.
  • 끝말


    빅큐리를 사용하는 사람 중에서도 일정 조회에서 데이터 파이프라인을 열심히 진행하는 경우가 적지 않다.일정 조회는 의존 관계 등을 고려할 수 없다.작업 프로세스 엔진을 가져오지 않은 빅Query 사용자는 손실이 없는 도구를 가져왔다고 생각합니다(SQLX가 SQL을 아는 사람이라면 학습 원가도 높지 않을 것입니다).
    또한 Dataform의 파이프라인도 API를 통해 호출할 수 있기 때문에 클라우드 Composier 등 작업 프로세스 엔진을 가져오더라도 일련의 프로세스에 삽입할 수 있다.
    본고에서 언급하지 않았지만 데이터 품질 관리(분배)와 조회 단일 테스트 등 충실한 기능도 있다.겨울방학에는 Dataform을 업무 중에 활용해 보세요.

    좋은 웹페이지 즐겨찾기