Go 언어로 BigQuery에 레코드 입력

24091 단어 GoBigQuerytech
BigQuery에 데이터를 투입하는 방법Google Cloud Client Libraries for Go을 사용합니다.

데이터 읽기 지침


Go 언어용 라이브러리 이외의 BigQuery에 데이터를 투입하는 방법은 데이터 로드 개요에 기재되어 있습니다.Go 언어용 라이브러리로 한정되면 다음과 같은 방법이 있다.
  • 배치 - 읽기 작업
  • 스트리밍 API
  • BigQuery Storage Write API
  • 기타
  • 배치 - 읽기 작업


    읽기 작업은 대량 읽기 위해 Google Cloud Storage에 지정된 파일 또는 로컬로 배치된 파일을 읽습니다.읽을 파일 형식은 CSV 등을 선택할 수 있습니다.
    GCS의 파일을 읽는 샘플 코드는 다음과 같습니다.
    별다른 일은 하지 않고 GCS에 있는 파일 참조를 만들어 마운트기→작업을 생성하고 작업이 끝날 때까지 기다린다.이렇게 하면 GCS의 파일을 BigQuery에서 읽을 수 있습니다.물론 빅Query에서 읽은 후에도 GCS에 파일이 남아 있기 때문에 빅Query에 데이터를 투입하기 위해 GCS를 이용한다면 파일의 설정과 삭제 작업이 필요하다.
    func loading(projectID string, bqDataset string, bqTable string, bucket string, objectkey string) int {
    	ctx := context.Background()
    	bqCli, err := bigquery.NewClient(ctx, projectID)
    	if err != nil {
    		log.Printf("failed to create bigquery client: %v", err)
    		return 2
    	}
    	dataset := bqCli.Dataset(bqDataset)
    	table := dataset.Table(bqTable)
    
    	gcsRef := bigquery.NewGCSReference(fmt.Sprintf("gs://%s/%s", bucket, objectkey))
    	gcsRef.AllowJaggedRows = true // treat missing row as nil
    	loader := table.LoaderFrom(gcsRef)
    	loader.CreateDisposition = bigquery.CreateNever // treat error if table not exist
    	job, err := loader.Run(ctx)
    
    	for {
    		status, err := job.Wait(ctx)
    		if err != nil {
    			log.Printf("failed to poll job status: %v", err)
    			return 2
    		}
    		if status.Done() {
    			if status.Err() != nil {
    				log.Printf("Job failed: %v", status.Err())
    				return 2
    			}
    			break
    		}
    	}
    
    	log.Print("Sucessfully load data into BigQuery")
    
    	return 0
    }
    
    로컬 파일을 읽는 샘플 코드는 다음과 같습니다.
    처리 프로세스는 GCS에서 읽을 때와 기본적으로 같습니다. 로더를 생성할 때만 gcs ref를 지정하거나 로컬 파일을 지정하는 차이입니다.
    func loadingLocal(projectID string, bqDataset string, bqTable string, filepath string) int {
    	ctx := context.Background()
    	bqCli, err := bigquery.NewClient(ctx, projectID)
    	if err != nil {
    		log.Printf("failed to create bigquery client: %v", err)
    		return 2
    	}
    	dataset := bqCli.Dataset(bqDataset)
    	table := dataset.Table(bqTable)
    
    	log.Printf("load rows from local file %s", filepath)
    
    	f, err := os.Open(filepath)
    	if err != nil {
    		log.Printf("failed to open file %s: %v", filepath, err)
    		return 2
    	}
    	source := bigquery.NewReaderSource(f)
    	// source.AutoDetect = true
    	source.SkipLeadingRows = 1
    	loader := table.LoaderFrom(source)
    	job, err := loader.Run(ctx)
    
    	for {
    		status, err := job.Wait(ctx)
    		if err != nil {
    			log.Printf("failed to poll job status: %v", err)
    			return 2
    		}
    		if status.Done() {
    			if status.Err() != nil {
    				log.Printf("Job failed: %v", status.Err())
    				return 2
    			}
    			break
    		}
    	}
    
    	log.Print("Sucessfully load data into BigQuery")
    
    	return 0
    }
    

    스트리밍 API


    스트리밍 API는 이미 권장되지 않는 방법입니다.현재 상태도 지원되지만 Starge Write API를 사용하는 것이 좋습니다.하지만 Go 언어의 SDK에서 Storage Write API는 미리보기 처리(2022-05-01 현재), 스트리밍 API를 활용하는 장면도 있다.
    스트리밍 API를 사용하는 샘플 코드는 다음과 같습니다.
    이른바 insertAll 방법(Google 측 API는 insertAll, SDK에서는 insertAll이 아닌 Put)을 사용해 구조체의 슬라이드를 전달하고 있다.또한 구조에 맞는 방법으로 세이브(ValueSaver 인터페이스)를 정의최선을 다하는 중복 제거할 수 있다.Save는 설치하지 않아도 됩니다.
    type Item struct {
    	Name  string
    	Size  float64
    	Count int
    }
    
    func (i *Item) Save() (map[string]bigquery.Value, string, error) {
    	return map[string]bigquery.Value{
    		"name": i.Name,
    		"size": i.Size,
    		"count": i.Count,
    	}, i.Name, nil
    }
    
    func uploading(projectID string, bqDataset string, bqTable string) int {
    	ctx := context.Background()
    	bqCli, err := bigquery.NewClient(ctx, projectID)
    	if err != nil {
    		log.Printf("failed to create bigquery client: %v", err)
    		return 2
    	}
    	dataset := bqCli.Dataset(bqDataset)
    	table := dataset.Table(bqTable)
    
    	log.Printf("load rows from stream")
    
    	inserter := table.Inserter()
    	items := []*Item{
    		{Name: "s1", Size: 32.6, Count: 7},
    		{Name: "s2", Size: 4, Count: 2},
    		{Name: "s3", Size: 101.5, Count: 1},
    	}
    	if err := inserter.Put(ctx, items); err != nil {
    		log.Printf("failed to put items: %v", err)
    		return 2
    	}
    
    	return 0
    }
    

    Storage Write API


    Storage Write API는 대량 읽기 및 스트리밍을 지원하는 새로운 API입니다.그러나 Go 언어 중 2022-05-01은 현재 미리보기 처리로 앞으로 어떻게 사용할지는 불분명하다.
    API 읽기golang-samples에 샘플 코드가 있습니다.그러나 Write API의 샘플 코드 단편만 정비된 것이 아니라 어떻게 실시해야 좋을지 모르겠다.StackOverflow에서 같은 문제(2021-12)를 열거했고 Protocol 버퍼에 대한 정의도 필요했다. 하고 싶은 일에 대해toomuch라고 느꼈다.기타 언어의 문서 견본 Protocol Buffer도 사용하기 때문에 Protocol Buffer는 필수일 수 있습니다.

    기타


    다른 방법으로는 SQL(DML) 사용 방법 등이 있지만 생략한다.

    참고 자료

  • Introduction to loading data - Bigquery Guides
  • BigQuery API Reference
  • BigQuery Storage API Reference
  • Google Cloud Client Libraries for Go
  • Google Cloud Client Libraries for Go (repo)
  • 좋은 웹페이지 즐겨찾기