BigQuery 테이블을 Python 클라이언트에서 조작

14789 단어 파이썬BigQuery

사전 준비



1. 자격증 취득



미리 BigQuery API를 이용하기 위한 서비스 계정을 만들고 자격 증명(JSON)을 다운로드합니다.
htps : // c ぉ d. 오, ぇ. 코 m / 드 cs / 맞아 ㅇ 치카 치온 / 껄껄 g-s r d? hl = 그럼

2. 필요한 패키지 설치



Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"

[requires]
python_version = "3.8"

테이블 재작성(Drop⇒Create)



1. 클라이언트 초기화



migrate_table.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)
[PATH TO CREDENTIAL] 는 자격 증명의 JSON 경로를 지정합니다.

2. 테이블 재생성



migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)

# テーブルの作成
schema = [
    bigquery.SchemaField('id', 'INT64'),
    bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
[DATASET NAME][TABLE NAME]는 작성할 데이터 세트 이름과 테이블 이름을 지정합니다.

기존 테이블을 삭제할 때 not_found_ok 플래그가 False 라고 테이블이 존재하지 않는 경우에 에러가 되므로 True 로 해 둡니다. DDL로 말하면 DROP TABLE IF EXISTS 같은 이미지.

테이블을 만들 때 스키마 정의로 열 이름과 형식을 지정해야 합니다. 디폴트에서는 Nullable인 컬럼이 되므로 Not Null인 컬럼으로 하는 경우는 모드를 지정해
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')

합니다.

3. 데이터 가져오기



초기 데이터로 CSV 데이터를 가져옵니다.
미리 스키마 정의의 형태에 맞춘 데이터를 CSV로 준비해 둡니다.

import.csv
id, name
1, hogehoge
2, fugafuga

migrate_table.py
# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
    job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()

CSV의 헤더를 읽는 경우는 skip_leading_rows 로 스킵 행수를 지정하면 OK.

View 재생성



migrate_view.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)


뷰의 경우도 실체 테이블과 거의 흐름은 동일하다.
다른 점은, 뷰의 경우는 view_query 에 SQL의 쿼리를 직접 지정하는 점.
스키마 정의는 SQL 쿼리에서 자동으로 구성되므로 지정할 필요가 없습니다.

일정 업데이트



Scheduled Query를 사용하여 정기적으로 데이터를 세분화하는 장면에서 실행 쿼리 및 스케줄링 설정을 변경하려는 경우.

migrate_schedule.py

import json

from google.cloud import bigquery
from google.oauth2 import service_account

from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery_datatransfer_v1.DataTransferServiceClient(
    credentials=credentials
)

config = google.protobuf.json_format.ParseDict(
    {
        "name": '[RESOURCE NAME]',
        "destination_dataset_id": '[DATASET NAME]',
        "display_name": '[DISPLAY NAME]',
        "data_source_id": "scheduled_query",
        "params": {
            "query": "SELECT * FROM dataset_name.table_name",
            "destination_table_name_template": '[TABLE NAME]',
            "write_disposition": "WRITE_TRUNCATE",
            "partitioning_field": "",
        },
        "schedule": "every 24 hours",
    },
    bigquery_datatransfer_v1.types.TransferConfig(),
)

update_mask = {"paths": ["display_name", "params", "schedule"]}

response = client.update_transfer_config(
    config, update_mask 
) 


스케줄 설정 내용( config )과 업데이트 마스크( update_mask )를 update_transfer_config 에 전달합니다.update_maskconfig 에서 어느 필드를 갱신할지를 지정하는 것입니다. update_mask 에 포함되지 않는 설정치는 갱신의 대상외가 됩니다.
config 의 설정값의 상세는 이하와 같다.

이름

스케줄의 자원 이름을 지정합니다.
리소스 이름은 BigQuery 콘솔의 스케줄된 쿼리에서 스케줄 목록을 표시하므로 해당 스케줄의 구성 정보에서 확인할 수 있습니다.


destination_dataset_id

스케줄에서 실행한 쿼리 결과의 대상 데이터 세트 이름을 지정합니다.

display_name

스케줄의 표시명이므로 임의의 이름을 붙입니다.

params.query

실행할 쿼리를 지정합니다.

params.destination_table_name_template

스케줄에서 실행한 쿼리 결과의 대상 테이블 이름을 지정합니다.

params.write_disposition

테이블에 저장하는 방법을 지정합니다.WRITE_TRUNCATE를 지정하면 기존 테이블이 실행 쿼리의 결과로 바뀝니다.WRITE_APPEND를 지정하면 기존 테이블에 실행 쿼리 결과가 추가됩니다.

schedule

스케줄의 실행 타이밍을 지정합니다.
ex.
1시간마다 실행・・・every 1 hour
매일 0시에 실행 ... every day 00:00

결론



BigQuery를 데이터 마트적으로 사용하는 경우 콘솔에서 테이블을 만들면 변경 관리가 어렵기 때문에 코드에 떨어뜨려 두면 git으로 관리 할 수있어 추천합니다.

좋은 웹페이지 즐겨찾기