Redshift 데이터를 BigQuery에 로드

15447 단어 BigQueryredshiftRails5
BigQuery를 활용하기 위해 Redshift 데이터를 BigQuery로 마이그레이션해야 했습니다.
그 때의 수법이나 순서 등을 기재합니다.

전제


  • Rails에서 Redshift 쿼리를 실행할 수 있도록하십시오
  • bq 명령을 실행할 수 있어야합니다
  • AWS SDK가 설치되어 있습니다

  • 처리 내용



    크게는 다음과 같은 흐름으로 작성했습니다.
  • Redshift 데이터를 S3으로 언로드
  • 대상 테이블의 스키마 파일 만들기
  • bq load 명령으로 데이터 가져 오기

  • 언로드


    query = sprintf("
      UNLOAD ('SELECT * FROM %<schema>s.%<table>s')
      TO 's3://redshift-unload-bucket/unload/%<schema>s/%<table>s/'
      IAM_ROLE 'arn:aws:iam::************:role/redshift-unload-role'
      MANIFEST
      DELIMITER AS ','
      GZIP
      ADDQUOTES
      ALLOWOVERWRITE
      PARALLEL OFF
      ESCAPE
      MAXFILESIZE AS 50MB
    ", sql:sql, schema: schema, table: table)
    ActiveRecord::Base.connection.execute query
    

    언로드를 위한 IAM 역할 만들기

    역할 이름: redshift-unload-role
    정책: AmazonS3FullAccess, AmazonRedshiftFullAccess


    특기사항
  • BigQuery로드를 지원하는 CSV 형식 (DELIMITER AS ',')으로 출력합니다.
  • BigQuery에로드 할 때 CSV 구조가 무너지지 않도록 ADDQUOTESESCAPE 옵션을 추가하십시오
  • PARALLEL ON 하면 왜 레코드가 누락되는 이벤트가 발생했기 때문에 OFF
  • 데이터 양이 너무 많으면 BigQuery 로딩이 작동하지 않는 문제가 발생했기 때문에 MAXFILESIZE 에서 50MB

    스키마 파일 작성



    Redshift의 pg_catalog.pg_table_def에서 스키마 정보를 얻을 수 있습니다.
    columns = []
    sql = sprintf("
      SELECT * FROM pg_catalog.pg_table_def
      WHERE schemaname = '%{schema}' AND tablename = '%{table}'
    ", schema: schema, table: table)
    result = ActiveRecord::Base.connection.execute sql
    result.each{|record|
      column = {mode:get_mode(record["notnull"]), name:record["column"], type:get_type(record["type"])}
      columns.push(column)
    }
    

    스키마 파일 출력
    file = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
    File.open(file, 'w') do |f|
      f.puts columns.to_json
    end
    

    보충

    Redshift와 BigQuery 유형의 차이를 보완하기위한 메소드를 다음과 같이 만듭니다.
    def self.get_mode(notnull)
      if notnull == 'y' then
        return 'REQUIRED'
      end
      return 'NULLABLE'
    end
    
    def self.get_type(type)
      if type.index('character') then
        return 'STRING'
      elsif type.index('bigint') then
        return 'INTEGER'
      elsif type.index('integer') then
        return 'INTEGER'
      elsif type.index('numeric') then
        return 'NUMERIC'
      elsif type.index('boolean') then
        return 'BOOL'
      elsif type.index('timestamp') then
        return 'TIMESTAMP'
      elsif type.index('date') then
        return 'DATE'
      end
      return 'STRING'
    end
    

    bq load 명령으로 BigQuery로 가져오기



    S3에서 언로드 된 객체를 가져 와서 루프 내에서
    S3에서 GS로 업로드 bq load 명령을 실행 중입니다.
    prefix = "unload/%{schema}/%{table}/" % {schema:schema, table:table}
    object_list = get_objects(prefix)
    for object in object_list do
      if object.index('.gz') then
        s3_to_gs(object)
        gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
        schema_path = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
        command = "bq load %{dataset}.%{table} %{gs_path} %{schema_path}" % {
          dataset:schema, table:table, gs_path:gs_path, schema_path:schema_path}
        system(command)
      end
    end
    

    보충

    언로드 한 S3의 파일을 얻는 방법은 다음과 같습니다.
    def self.s3_to_gs(object)
      s3_path = "s3://%{bucket}/%{object}" % {bucket:$s3_bucket, object:object}
      gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
      command = "gsutil cp -r %{s3_path} %{gs_path}" % {s3_path:s3_path, gs_path:gs_path}
      system(command + $command_output)
    end
    

    언로드 한 S3의 파일을 얻는 방법은 다음과 같습니다.
    def self.get_objects(prefix)
      object_list = []
      next_token = ''
      while true do
        if next_token == '' then
          response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix)
        else
          response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix, continuation_token: next_token)
        end
        if response.contents.length == 0 then
          return []
        end
        for content in response.contents do
          object_list.push(content.key)
        end
        if response.next_continuation_token != nil then
          next_token = response.next_continuation_token
        else
          return object_list
        end
      end
    end
    

    마지막으로


  • 여기에서는 스키마 파일을 작성해 BigQuery에 로드 했습니다만, bq load--autodetect 옵션을 이용하면 스키마 정보를 자동 검출하는 것도 가능합니다.
  • 자동 검출에서는 데이터의 값에 따라 문자열, 수치, 시간은 높은 정밀도로 검출해 주었습니다. 쉽게 로드하고 싶은 경우는 꽤 붙인다고 생각합니다.
  • 데이터의 사이즈가 커지면 이번 `bq load'의 방법에서는 상당한 시간이 걸리는 것이 상정됩니다. dataflow를 사용하여 BigQuery로의 데이터 로드를 대폭 줄일 수 있었기 때문에 또 기회에 기사로 할 수 있으면 좋겠습니다.
  • 좋은 웹페이지 즐겨찾기