Dataform에서 BigQuery 데이터 파이프라인 구축
BigQuery의 COVID-19 일반 공개 데이터 세트를 바탕으로 Google Cloud 패밀리의 일원인 Dataform에서 BigQuery의 데이터 파이프라인을 조합해 보았습니다.
Dataform은
Dataform은 기존 ETL 파이프라인이 아니라 ELT 파이프라인에 유용한 도구입니다.
ELT 파이프라인은 데이터 소스에서 추출(Extract)한 원본 데이터를 데이터 소프트웨어 하우스(DWH)(Load)로 직접 읽고 DWH 내에서 목적에 맞는 테이블 구조(Transform)로 변환하는 일련의 프로세스를 말합니다.Dataform은 ELT 파이프라인에서 Transform을 특화하는 도구입니다.
구글클라우드 산하
구글클라우드는 2020년 12월 8일(미국시간) 데이터폼브로드캐스트을 인수했다.동시에 모든 사용자는 데이터 폼을 무료로 사용할 수 있다.
Google Cloud에서 Dataform을 사용할 수 있어서 기쁩니다.우리는 전체 조직의 모든 사용자가 정보를 분석할 수 있도록 하는 사명을 계속 집행할 것이다.이번에는 모든 이용자에게 Dataform 서비스를 무료로 제공하기로 했다.앞으로 Dataform과 BigQuery의 우수한 기능을 조합하여 사용할 수 있기를 기대합니다.
실제로 시험해 보다
이번에 손수건으로 만든 Dataform의 소스 코드가 여기 있습니다.
전제 조건
이번 일
Dataform 프로젝트 시작
[GCC] 서비스 계정 만들기
대상 GCP 프로젝트에서 서비스 계정을 만들고 BigQuery 관리자 역할을 부여합니다.
서비스 계정을 만들면 JSON의 비밀 키가 다운로드되어 보관됩니다.
※ Dataform에서 발행된 BigQuery의 의뢰는 모두 해당 서비스 계정에서 수행됩니다.
사용자 계정 만들기
다음 링크에서 데이터form을 만드는 사용자 계정
[Dataform] 프로젝트 만들기
서명 완료 후새 프로젝트 작성.
(선택 가능) 모든 Giit 공급자가 버전 관리
Dataform에 생성된 소스 코드는 Giit에서 버전 관리를 수행합니다.Giit 관리 제공자를 임의(GiitHub, GiitLab, Azure DevOps)로 이전할 수도 있다.
Giit 공급자를 이전하려면 왼쪽 창의 프로젝트 settings > Version control에서 설정하십시오. (빈 창고와 방문 영패를 따로 준비해야 합니다.)
이른바 SQLX
Dataform에서 SQLX 형식으로 테이블과 뷰를 정의합니다.먼저 SQLX의 개요와 표기법을 이해하십시오.
소스 데이터를 정의하는 SQLX 만들기
SQLX를 사용하여 다음 테이블을 소스 데이터로 정의합니다.
bigquery-public-data.covid19_open_data.covid19_open_data
bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d
bigquery-public-data.covid19_open_data.covid19_open_data
를 소스 데이터로 정의하는 SQLXdefinitions/declaration/covid19_open_data.sqlx
config {
type: "declaration",
database: "bigquery-public-data",
schema: "covid19_open_data",
name: "covid19_open_data"
}
config 블록의 type
에'declation'을 지정하여 읽기 원본의 프로젝트 ID(database), 데이터 집합 이름(schema), 표 이름(name)을 정의합니다.이 설명을 통해 테이블이나 보기를 만드는 SQLX에서 이 테이블을 호출하거나 데이터 form의 의존 관계를 설명할 수 있습니다.뷰를 정의하는 SQLX 작성
SQLX에서는 본도부현에 따라 신규 감염자와 신규 사망자를 산정하는 뷰를 정의해 빅큐리에 창설했다.
definitions/japan_actual.sqlx
config {
type: "view",
schema: "covid19",
name: "japan_actual",
description: "都道府県別感染状況(日別)",
columns: {
date: "日",
prefecture_code: "都道府県コード",
new_confirmed: "新規感染者数",
new_deaths: "新規死者数"
}
}
# standardSQL
SELECT
date,
REPLACE(location_key, "_", "-") AS prefecture_code,
SUM(new_confirmed) AS new_confirmed,
SUM(new_deceased) AS new_deaths
FROM
${ref("covid19_open_data")}
WHERE
country_code = "JP"
AND location_key <> "JP"
GROUP BY
date, prefecture_code
config 블록type
에'view'를 지정하여 보기의 설명(description)과 열의 설명(columns)을 정의합니다.SQL 블록에는 뷰에서 사용되는 표준 SQL이 설명되어 있습니다.FROM 문에서 방금 발표된 소스 데이터를
${ref("covid19_open_data")}
형식으로 호출합니다.이 형식으로 표 인용을 기술함으로써 데이터form의 의존 관계를 해석할 수 있다.SQLX를 작성하면 오른쪽 창에 뷰의 작성 위치나 종속성
bigquery-public-data.covid19_open_data.covid19_open_data
, 컴파일된 질의${ref("covid19_open_data")}
내용 등이 표시됩니다.PREVIEW RESULTS에서 조회 결과를 확인하고(실제로 BigQuery에서 조회 작업을 발행할 때 요금 등을 주의해야 함) CREATE VIEW에서 보기를 만들 수 있습니다.파티션 테이블을 정의하는 SQLX 만들기
bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d
부터 분구표에 각 도도부현의 감염 상황 예측(일별)을 작성하는 SQLX를 창설한다.definitions/japan_forecast.sqlx
config {
type: "incremental",
schema: "covid19",
name: "japan_forecast",
description: "都道府県別感染状況予測(日別)",
columns: {
prefecture_code: "都道府県コード",
prefecture_name_kanji: "都道府県名",
forecast_date: "予測実施日",
prediction_date: "予測対象日",
new_confirmed: "新規感染者数予測",
new_deaths: "新規死者数予測"
},
bigquery: {
partitionBy: "prediction_date"
},
tags: ["daily"]
}
# standardSQL
SELECT
prefecture_code,
prefecture_name_kanji,
forecast_date, -- 予測実施日
prediction_date, -- 予測対象日
new_confirmed,
new_deaths,
FROM
${ref("japan_prefecture_28d")}
${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }
config 블록type
에서 "increamental"을 지정하면 섹션 테이블을 만들 수 있습니다.구역에 대한 세부 사항bigquery
에서 지정partitionBy
등.처음 또는 전체 새로 고침을 수행할 때 다음 질의 작업이 BigQuery에서 릴리즈되고 새 테이블이 작성되도록 지정합니다.
create or replace table `dataform-advent-calendar.covid19.japan_forecast` partition by prediction_date as
# standardSQL
SELECT
prefecture_code,
prefecture_name_kanji,
forecast_date, -- 予測実施日
prediction_date, -- 予測対象日
new_confirmed,
new_deaths,
FROM
`bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`
두 번째 이후에 실행할 때 다음과 같은 조회 작업을 발행하여 INSERT를 진행한다.insert into `dataform-advent-calendar.covid19.japan_forecast`
(prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths)
select prefecture_code,prefecture_name_kanji,forecast_date,prediction_date,new_confirmed,new_deaths
from (
# standardSQL
SELECT
prefecture_code,
prefecture_name_kanji,
forecast_date, -- 予測実施日
prediction_date, -- 予測対象日
new_confirmed,
new_deaths,
FROM
`bigquery-public-data.covid19_public_forecasts.japan_prefecture_28d`
WHERE forecast_date > (SELECT MAX(forecast_date) FROM `dataform-advent-calendar.covid19.japan_forecast`)) as insertions
INSERT의 경우 SQL 블록의 다음 섹션이 평가됩니다.${ when(incremental(), `WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})`) }
increamental(INSERT)에서 WHERE forecast_date > (SELECT MAX(forecast_date) FROM ${self()})
를 검색에 추가합니다.${self()}
정의된 섹션 테이블 자체의 테이블 지정을 포함합니다.일반 테이블을 정의하는 SQLX 만들기
실제 값 보기와 예측 값 구분표를 정의하고 이 표와 결합하여 예측 값과 실제 값을 비교하는 표를 만듭니다.
definitions/japan_forecast_comparison.sqlx
config {
type: "table",
schema: "covid19",
name: "japan_forecast_comparison",
description: "予測値評価(日別)",
columns: {
date: "日",
prefecture_name_kanji: "都道府県名",
forecast_new_confirmed: "新規感染者数(予測値)",
new_confirmed: "新規感染者数",
forecast_new_deaths: "新規死者数(予測値)",
new_deaths: "新規死者数",
latest_forecast_date: "最新の予測実施日"
},
tags: ["daily"]
}
# standardSQL
WITH t AS (
SELECT
f.prefecture_name_kanji,
f.prediction_date AS date,
f.forecast_date,
MAX (f.forecast_date) OVER (
PARTITION BY f.prefecture_code,
f.prediction_date
) AS latest_forecast_date,
j.new_confirmed,
f.new_confirmed AS forecast_new_confirmed,
j.new_deaths,
f.new_deaths AS forecast_new_deaths
FROM
${ref("japan_forecast")} f
LEFT JOIN ${ref("japan_actual")} j ON f.prediction_date = j.date
AND f.prefecture_code = j.prefecture_code
WHERE
f.prediction_date >= "2020-12-21"
)
SELECT
date,
prefecture_name_kanji,
new_confirmed,
forecast_new_confirmed,
new_deaths,
forecast_new_deaths,
latest_forecast_date
FROM
t
WHERE
forecast_date = latest_forecast_date
각 표의 의존 관계를 확인하다
햄버거 메뉴에서 Dependency Tree를 열면 정의된 테이블과 뷰의 종속성을 확인할 수 있습니다.
이를 통해 알 수 있듯이 모든 원본 표는 실제 값 보기와 예측 값의 구분표를 만들고 예측 비교 표에 연결된다.
일정을 짜다
매일 아침 9시(JST)에 전망치의 파티션 테이블과 예측 비교 테이블을 업데이트하고 싶어 일정을 짰다.
파일 일람
environments.json
에서 열면 다음 화면이 표시됩니다(View as plaain text 각도를 사용하면 json을 직접 편집할 수도 있습니다).CREATE NEW SCHEDULE은 객체 레이블이나 실행 시 종속성 등을 지정합니다.
Tags to run
에 수행 시간 지정 객체의 레이블 이름을 입력합니다.tags: ["daily"]
끝말
빅큐리를 사용하는 사람 중에서도 일정 조회에서 데이터 파이프라인을 열심히 진행하는 경우가 적지 않다.일정 조회는 의존 관계 등을 고려할 수 없다.작업 프로세스 엔진을 가져오지 않은 빅Query 사용자는 손실이 없는 도구를 가져왔다고 생각합니다(SQLX가 SQL을 아는 사람이라면 학습 원가도 높지 않을 것입니다).
또한 Dataform의 파이프라인도 API를 통해 호출할 수 있기 때문에 클라우드 Composier 등 작업 프로세스 엔진을 가져오더라도 일련의 프로세스에 삽입할 수 있다.
본고에서 언급하지 않았지만 데이터 품질 관리(분배)와 조회 단일 테스트 등 충실한 기능도 있다.겨울방학에는 Dataform을 업무 중에 활용해 보세요.
Reference
이 문제에 관하여(Dataform에서 BigQuery 데이터 파이프라인 구축), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/y2000/articles/10c9f8c20243ca03fd31텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)