Redshift 데이터를 BigQuery에 로드
그 때의 수법이나 순서 등을 기재합니다.
전제
처리 내용
크게는 다음과 같은 흐름으로 작성했습니다.
언로드
query = sprintf("
UNLOAD ('SELECT * FROM %<schema>s.%<table>s')
TO 's3://redshift-unload-bucket/unload/%<schema>s/%<table>s/'
IAM_ROLE 'arn:aws:iam::************:role/redshift-unload-role'
MANIFEST
DELIMITER AS ','
GZIP
ADDQUOTES
ALLOWOVERWRITE
PARALLEL OFF
ESCAPE
MAXFILESIZE AS 50MB
", sql:sql, schema: schema, table: table)
ActiveRecord::Base.connection.execute query
언로드를 위한 IAM 역할 만들기
역할 이름: redshift-unload-role
정책: AmazonS3FullAccess, AmazonRedshiftFullAccess
특기사항
DELIMITER AS ','
)으로 출력합니다.ADDQUOTES
및 ESCAPE
옵션을 추가하십시오 PARALLEL ON
하면 왜 레코드가 누락되는 이벤트가 발생했기 때문에 OFF
에 MAXFILESIZE
에서 50MB
스키마 파일 작성
Redshift의 pg_catalog.pg_table_def에서 스키마 정보를 얻을 수 있습니다.
columns = []
sql = sprintf("
SELECT * FROM pg_catalog.pg_table_def
WHERE schemaname = '%{schema}' AND tablename = '%{table}'
", schema: schema, table: table)
result = ActiveRecord::Base.connection.execute sql
result.each{|record|
column = {mode:get_mode(record["notnull"]), name:record["column"], type:get_type(record["type"])}
columns.push(column)
}
스키마 파일 출력
file = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
File.open(file, 'w') do |f|
f.puts columns.to_json
end
보충
Redshift와 BigQuery 유형의 차이를 보완하기위한 메소드를 다음과 같이 만듭니다.
def self.get_mode(notnull)
if notnull == 'y' then
return 'REQUIRED'
end
return 'NULLABLE'
end
def self.get_type(type)
if type.index('character') then
return 'STRING'
elsif type.index('bigint') then
return 'INTEGER'
elsif type.index('integer') then
return 'INTEGER'
elsif type.index('numeric') then
return 'NUMERIC'
elsif type.index('boolean') then
return 'BOOL'
elsif type.index('timestamp') then
return 'TIMESTAMP'
elsif type.index('date') then
return 'DATE'
end
return 'STRING'
end
bq load 명령으로 BigQuery로 가져오기
S3에서 언로드 된 객체를 가져 와서 루프 내에서
S3에서 GS로 업로드
bq load
명령을 실행 중입니다.prefix = "unload/%{schema}/%{table}/" % {schema:schema, table:table}
object_list = get_objects(prefix)
for object in object_list do
if object.index('.gz') then
s3_to_gs(object)
gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
schema_path = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
command = "bq load %{dataset}.%{table} %{gs_path} %{schema_path}" % {
dataset:schema, table:table, gs_path:gs_path, schema_path:schema_path}
system(command)
end
end
보충
언로드 한 S3의 파일을 얻는 방법은 다음과 같습니다.
def self.s3_to_gs(object)
s3_path = "s3://%{bucket}/%{object}" % {bucket:$s3_bucket, object:object}
gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
command = "gsutil cp -r %{s3_path} %{gs_path}" % {s3_path:s3_path, gs_path:gs_path}
system(command + $command_output)
end
언로드 한 S3의 파일을 얻는 방법은 다음과 같습니다.
def self.get_objects(prefix)
object_list = []
next_token = ''
while true do
if next_token == '' then
response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix)
else
response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix, continuation_token: next_token)
end
if response.contents.length == 0 then
return []
end
for content in response.contents do
object_list.push(content.key)
end
if response.next_continuation_token != nil then
next_token = response.next_continuation_token
else
return object_list
end
end
end
마지막으로
bq load
의 --autodetect
옵션을 이용하면 스키마 정보를 자동 검출하는 것도 가능합니다. Reference
이 문제에 관하여(Redshift 데이터를 BigQuery에 로드), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/k63207/items/0776ba491e98ab589ec7텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)