BigQuery 테이블을 Python 클라이언트에서 조작
사전 준비
1. 자격증 취득
미리 BigQuery API를 이용하기 위한 서비스 계정을 만들고 자격 증명(JSON)을 다운로드합니다.
htps : // c ぉ d. 오, ぇ. 코 m / 드 cs / 맞아 ㅇ 치카 치온 / 껄껄 g-s r d? hl = 그럼
2. 필요한 패키지 설치
Pipfile[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
테이블 재작성(Drop⇒Create)
1. 클라이언트 초기화
migrate_table.pyimport json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
[PATH TO CREDENTIAL]
는 자격 증명의 JSON 경로를 지정합니다.
2. 테이블 재생성
migrate_table.pytable = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)
# テーブルの作成
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
[DATASET NAME]
및 [TABLE NAME]
는 작성할 데이터 세트 이름과 테이블 이름을 지정합니다.
기존 테이블을 삭제할 때 not_found_ok
플래그가 False
라고 테이블이 존재하지 않는 경우에 에러가 되므로 True
로 해 둡니다. DDL로 말하면 DROP TABLE IF EXISTS
같은 이미지.
테이블을 만들 때 스키마 정의로 열 이름과 형식을 지정해야 합니다. 디폴트에서는 Nullable인 컬럼이 되므로 Not Null인 컬럼으로 하는 경우는 모드를 지정해
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
합니다.
3. 데이터 가져오기
초기 데이터로 CSV 데이터를 가져옵니다.
미리 스키마 정의의 형태에 맞춘 데이터를 CSV로 준비해 둡니다.
import.csvid, name
1, hogehoge
2, fugafuga
migrate_table.py# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
CSV의 헤더를 읽는 경우는 skip_leading_rows
로 스킵 행수를 지정하면 OK.
View 재생성
migrate_view.pyimport json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
뷰의 경우도 실체 테이블과 거의 흐름은 동일하다.
다른 점은, 뷰의 경우는 view_query
에 SQL의 쿼리를 직접 지정하는 점.
스키마 정의는 SQL 쿼리에서 자동으로 구성되므로 지정할 필요가 없습니다.
일정 업데이트
Scheduled Query를 사용하여 정기적으로 데이터를 세분화하는 장면에서 실행 쿼리 및 스케줄링 설정을 변경하려는 경우.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
스케줄 설정 내용( config
)과 업데이트 마스크( update_mask
)를 update_transfer_config
에 전달합니다.update_mask
는 config
에서 어느 필드를 갱신할지를 지정하는 것입니다. update_mask
에 포함되지 않는 설정치는 갱신의 대상외가 됩니다.
config
의 설정값의 상세는 이하와 같다.
이름
스케줄의 자원 이름을 지정합니다.
리소스 이름은 BigQuery 콘솔의 스케줄된 쿼리에서 스케줄 목록을 표시하므로 해당 스케줄의 구성 정보에서 확인할 수 있습니다.
destination_dataset_id
스케줄에서 실행한 쿼리 결과의 대상 데이터 세트 이름을 지정합니다.
display_name
스케줄의 표시명이므로 임의의 이름을 붙입니다.
params.query
실행할 쿼리를 지정합니다.
params.destination_table_name_template
스케줄에서 실행한 쿼리 결과의 대상 테이블 이름을 지정합니다.
params.write_disposition
테이블에 저장하는 방법을 지정합니다.WRITE_TRUNCATE
를 지정하면 기존 테이블이 실행 쿼리의 결과로 바뀝니다.WRITE_APPEND
를 지정하면 기존 테이블에 실행 쿼리 결과가 추가됩니다.
schedule
스케줄의 실행 타이밍을 지정합니다.
ex.
1시간마다 실행・・・every 1 hour
매일 0시에 실행 ... every day 00:00
결론
BigQuery를 데이터 마트적으로 사용하는 경우 콘솔에서 테이블을 만들면 변경 관리가 어렵기 때문에 코드에 떨어뜨려 두면 git으로 관리 할 수있어 추천합니다.
Reference
이 문제에 관하여(BigQuery 테이블을 Python 클라이언트에서 조작), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/tikamoto/items/c269adcc9a3332484686
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
1. 클라이언트 초기화
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
[PATH TO CREDENTIAL]
는 자격 증명의 JSON 경로를 지정합니다.2. 테이블 재생성
migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)
# テーブルの作成
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
[DATASET NAME]
및 [TABLE NAME]
는 작성할 데이터 세트 이름과 테이블 이름을 지정합니다.기존 테이블을 삭제할 때
not_found_ok
플래그가 False
라고 테이블이 존재하지 않는 경우에 에러가 되므로 True
로 해 둡니다. DDL로 말하면 DROP TABLE IF EXISTS
같은 이미지.테이블을 만들 때 스키마 정의로 열 이름과 형식을 지정해야 합니다. 디폴트에서는 Nullable인 컬럼이 되므로 Not Null인 컬럼으로 하는 경우는 모드를 지정해
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
합니다.
3. 데이터 가져오기
초기 데이터로 CSV 데이터를 가져옵니다.
미리 스키마 정의의 형태에 맞춘 데이터를 CSV로 준비해 둡니다.
import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
CSV의 헤더를 읽는 경우는
skip_leading_rows
로 스킵 행수를 지정하면 OK.View 재생성
migrate_view.pyimport json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
뷰의 경우도 실체 테이블과 거의 흐름은 동일하다.
다른 점은, 뷰의 경우는 view_query
에 SQL의 쿼리를 직접 지정하는 점.
스키마 정의는 SQL 쿼리에서 자동으로 구성되므로 지정할 필요가 없습니다.
일정 업데이트
Scheduled Query를 사용하여 정기적으로 데이터를 세분화하는 장면에서 실행 쿼리 및 스케줄링 설정을 변경하려는 경우.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
스케줄 설정 내용( config
)과 업데이트 마스크( update_mask
)를 update_transfer_config
에 전달합니다.update_mask
는 config
에서 어느 필드를 갱신할지를 지정하는 것입니다. update_mask
에 포함되지 않는 설정치는 갱신의 대상외가 됩니다.
config
의 설정값의 상세는 이하와 같다.
이름
스케줄의 자원 이름을 지정합니다.
리소스 이름은 BigQuery 콘솔의 스케줄된 쿼리에서 스케줄 목록을 표시하므로 해당 스케줄의 구성 정보에서 확인할 수 있습니다.
destination_dataset_id
스케줄에서 실행한 쿼리 결과의 대상 데이터 세트 이름을 지정합니다.
display_name
스케줄의 표시명이므로 임의의 이름을 붙입니다.
params.query
실행할 쿼리를 지정합니다.
params.destination_table_name_template
스케줄에서 실행한 쿼리 결과의 대상 테이블 이름을 지정합니다.
params.write_disposition
테이블에 저장하는 방법을 지정합니다.WRITE_TRUNCATE
를 지정하면 기존 테이블이 실행 쿼리의 결과로 바뀝니다.WRITE_APPEND
를 지정하면 기존 테이블에 실행 쿼리 결과가 추가됩니다.
schedule
스케줄의 실행 타이밍을 지정합니다.
ex.
1시간마다 실행・・・every 1 hour
매일 0시에 실행 ... every day 00:00
결론
BigQuery를 데이터 마트적으로 사용하는 경우 콘솔에서 테이블을 만들면 변경 관리가 어렵기 때문에 코드에 떨어뜨려 두면 git으로 관리 할 수있어 추천합니다.
Reference
이 문제에 관하여(BigQuery 테이블을 Python 클라이언트에서 조작), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/tikamoto/items/c269adcc9a3332484686
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
Scheduled Query를 사용하여 정기적으로 데이터를 세분화하는 장면에서 실행 쿼리 및 스케줄링 설정을 변경하려는 경우.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
스케줄 설정 내용(
config
)과 업데이트 마스크( update_mask
)를 update_transfer_config
에 전달합니다.update_mask
는 config
에서 어느 필드를 갱신할지를 지정하는 것입니다. update_mask
에 포함되지 않는 설정치는 갱신의 대상외가 됩니다.config
의 설정값의 상세는 이하와 같다.이름
스케줄의 자원 이름을 지정합니다.
리소스 이름은 BigQuery 콘솔의 스케줄된 쿼리에서 스케줄 목록을 표시하므로 해당 스케줄의 구성 정보에서 확인할 수 있습니다.
destination_dataset_id
스케줄에서 실행한 쿼리 결과의 대상 데이터 세트 이름을 지정합니다.
display_name
스케줄의 표시명이므로 임의의 이름을 붙입니다.
params.query
실행할 쿼리를 지정합니다.
params.destination_table_name_template
스케줄에서 실행한 쿼리 결과의 대상 테이블 이름을 지정합니다.
params.write_disposition
테이블에 저장하는 방법을 지정합니다.
WRITE_TRUNCATE
를 지정하면 기존 테이블이 실행 쿼리의 결과로 바뀝니다.WRITE_APPEND
를 지정하면 기존 테이블에 실행 쿼리 결과가 추가됩니다.schedule
스케줄의 실행 타이밍을 지정합니다.
ex.
1시간마다 실행・・・every 1 hour
매일 0시에 실행 ... every day 00:00
결론
BigQuery를 데이터 마트적으로 사용하는 경우 콘솔에서 테이블을 만들면 변경 관리가 어렵기 때문에 코드에 떨어뜨려 두면 git으로 관리 할 수있어 추천합니다.
Reference
이 문제에 관하여(BigQuery 테이블을 Python 클라이언트에서 조작), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/tikamoto/items/c269adcc9a3332484686
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Reference
이 문제에 관하여(BigQuery 테이블을 Python 클라이언트에서 조작), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/tikamoto/items/c269adcc9a3332484686텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)