Beam에서 Scheema를 사용하면 BigQuery의 쓰기가 쉬워집니다.
뱀이라는 Schema.
Beam 처리 데이터에 추상적인 기능을 가져옵니다.빅큐리의 패턴과는 다른 개념(※)이다.
※ 변환할 도구가 있습니다.
BigQueryIO와의 관계
Schema를 사용하지 않는 경우BigQueryIO로 쓰기
TableSchema 준비
1. 에 사용된 데이터 유형은 Schema를 지정하여 (무료한) 교체 처리를 생략할 수 있습니다.
Scheam을 사용하지 않는 경우
BigQueryTornadoes를 기초로 한다.
// 元データの入るクラス
public static class Tornado {
// AvroCoderに必要
public TornadoScheman() {}
public Integer getMonth() {
return month;
}
public Long getCount() {
return count;
}
private Integer month;
private Long count;
public Tornado (final Integer month, final Long count) {
this.month = month;
this.count = count;
}
}
// ダミーの入力データ
domainResult = p.apply(Create.of(
new Tornado(1, 1l),
new Tornado(2, 3l)
)).withCoder(AvroCoder.of(Tornado.class)));
// TableRowへの詰替(2)
final PCollection<TableRow>tableRowResult = domainResult.apply(ParDo.of(new DoFn<Tornado, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("month", c.element().getMonth())
.set("tornado_count", c.element().getCount());
c.output(row);
}
}));
// TableScheamの準備(3)
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
// BigQueryIOで書き込み
tableRowResult.apply(
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
개조 처리(2부분)는 간단하지만 심심하니 누가 해줬으면 좋겠어요.Scheam 사용 시
추가할 항목
그냥그리고 Schema를 사용하면
// Scheamのあるクラスの定義
// アノテーション追加
@DefaultSchema(JavaBeanSchema.class)
public static class TornadoSchema {
public Integer getMonth() {
return month;
}
public Long getCount() {
return count;
}
private final Integer month;
private final Long count;
// アノテーション追加
@SchemaCreate
public TornadoSchema (final Integer month, final Long count) {
this.month = month;
this.count = count;
}
}
domainResult.apply(
BigQueryIO.<TornadoSchema>write()
// 追加
.useBeamSchema()
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
는 Scheema를 사용하지 않는 경우에 비해 장소의 교체 부분이 없어 매끄럽다.주의
Create.of에서 Schema를 사용할 때는 pom에서 플러그인 지정가 필요합니다.
읽을 때 어때요?
조사 중...
읽을 때도 GenericRecord(Avoro)부터 교체하기 때문에 편하다.
Reference
이 문제에 관하여(Beam에서 Scheema를 사용하면 BigQuery의 쓰기가 쉬워집니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/notrogue/articles/64fb1dac28a6ba텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)