Beam에서 Scheema를 사용하면 BigQuery의 쓰기가 쉬워집니다.

5034 단어 GCPdataflowtech

뱀이라는 Schema.


https://beam.apache.org/documentation/programming-guide/#schemas
Beam 처리 데이터에 추상적인 기능을 가져옵니다.빅큐리의 패턴과는 다른 개념(※)이다.
변환할 도구가 있습니다.

BigQueryIO와의 관계


Schema를 사용하지 않는 경우BigQueryIO로 쓰기
  • 쓸 데이터 준비
  • 1.에서 준비된 데이터(유형 종속 도메인)를 TableRow
  • 로 변환

  • TableSchema 준비
  • BigQueryIO.write
  • 단계.
    1. 에 사용된 데이터 유형은 Schema를 지정하여 (무료한) 교체 처리를 생략할 수 있습니다.

    Scheam을 사용하지 않는 경우


    BigQueryTornadoes를 기초로 한다.
      // 元データの入るクラス
      public static class Tornado {
        // AvroCoderに必要
        public TornadoScheman() {}
        
        public Integer getMonth() {
          return month;
        }
        public Long getCount() {
          return count;
        }
        private Integer month;
        private Long count;
        public Tornado (final Integer month, final Long count) {
          this.month = month;
          this.count = count;
        }
      }
    
        // ダミーの入力データ
        domainResult = p.apply(Create.of(
                new Tornado(1, 1l),
                new Tornado(2, 3l)
        )).withCoder(AvroCoder.of(Tornado.class)));
    
         // TableRowへの詰替(2)
        final PCollection<TableRow>tableRowResult = domainResult.apply(ParDo.of(new DoFn<Tornado, TableRow>() {
          @ProcessElement
          public void processElement(ProcessContext c)  {
            TableRow row = new TableRow()
                    .set("month", c.element().getMonth())
                    .set("tornado_count", c.element().getCount());
            c.output(row);
          }
        }));
        // TableScheamの準備(3)
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);
    
    
        // BigQueryIOで書き込み
        tableRowResult.apply(
                BigQueryIO.writeTableRows()
                        .withSchema(schema)
                        .to(options.getOutput())
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
    
    개조 처리(2부분)는 간단하지만 심심하니 누가 해줬으면 좋겠어요.

    Scheam 사용 시


    추가할 항목
  • 원시 데이터의 클래스(Tornado)에 패턴 정의를 추가하는 근사(DefaultSchema와 ScheemaCreate)(TornadoSchema)
  • BigQueryIO에 useBeam Schema
  • 추가
    그냥그리고 Schema를 사용하면
  • Coder의 추정
  • TableScheam의 추정
  • (withCoder 및 빈 구조기, withSchema 부분)
      // Scheamのあるクラスの定義
      // アノテーション追加
      @DefaultSchema(JavaBeanSchema.class)
      public static class TornadoSchema {
        public Integer getMonth() {
          return month;
        }
        public Long getCount() {
          return count;
        }
        private final Integer month;
        private final Long count;
        // アノテーション追加
        @SchemaCreate
        public TornadoSchema (final Integer month, final Long count) {
          this.month = month;
          this.count = count;
        }
      }
    
        domainResult.apply(
                BigQueryIO.<TornadoSchema>write()
                        // 追加
    		    .useBeamSchema()
                        .to(options.getOutput())
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
    
    는 Scheema를 사용하지 않는 경우에 비해 장소의 교체 부분이 없어 매끄럽다.

    주의


    Create.of에서 Schema를 사용할 때는 pom에서 플러그인 지정가 필요합니다.

    읽을 때 어때요?


    조사 중...
    읽을 때도 GenericRecord(Avoro)부터 교체하기 때문에 편하다.

    좋은 웹페이지 즐겨찾기