Cloud Dataflow에서 BigQuery에 스트리밍 삽입할 때 약간의 주의

문제



Dataflow에서 Build-in I/O 라이브러리을 사용하여 BigQuery에 스트리밍 인서트로 데이터를 삽입하려고 시도하지 않는 오류가 발생했습니다.

이런 코드



pubsub2bigquery.py
import argparse
import logging
import json

from past.builtins import unicode

import apache_beam as beam

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class JSONStringToDictFn(beam.DoFn):
    def process(self, element):
        items = json.loads(element)

        yield items

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
      '--input_subscription',
      required=True,
      help=(
          'Input PubSub subscription '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
    parser.add_argument(
      '--output_dataset',
      required=True,
      help=(
          'Output BigQuery dataset '
          '"<PROJECT>.<DATASET>"'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:

        subscription = known_args.input_subscription
        rides = (
            p
            | 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            | 'ToDict' >> beam.ParDo(JSONStringToDictFn())
        )

        (bigquery_project, dataset) = known_args.output_dataset.split('.')
        rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
                                                             dataset=dataset,
                                                             project=bigquery_project)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

이런 프로그램을 다음과 같이 실행하면 에러가 발생.
python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>

수중 환경에서는 자동 스케일링의 다음 옵션을 빼면, 즉 작업자 1에서 실행하면 오류는 발생하지 않습니다.
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \

오류



전문은 발췌하지 않지만, 다음과 같은 내용을 포함한 에러가 로그에 나옵니다.

"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base. exceptions.HttpForbiddenError: HttpError accessing htps : // 비 g 쿠에 ry. ㅇㅜㅜㅜㅜ 이 m/비g 쿠에 ry/v2/p 로지ぇcts/슈아-gcp보오 k/다타세 ts/nyc_타ぃ_티p/타bぇs/레아l치메_리로 s? 아 lt = j : response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options ': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '- content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "domain": "usageLimits", "r eason": "rateLimitExceeded"} ], "status": "PERMISSION_DENIED"} } > [while running 'generatedPtransform-121843']

아래쪽에 "code": 403라든지 "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"라든지 "reason": "rateLimitExceeded"라든지 있으므로, API콜의 상한에 걸린 것 같습니다.

음, 스트리밍 인서트에 너무 많이 씁니까? . .
라고 생각했습니다만, 공식 문서의 이하대로, 쿼터는 없을 것.



좀 더 잘 로그를 보면. . .

"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(생략)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)
_create_table_if_needed ? get_table ? 분명히 테이블 정보의 취득 다음의 API 조작 걸려 있는 모양. BigQuery에 쓰는 곳의 apache_beam.io.gcp.bigquery.WriteToBigQuerycreate_disposition 파라미터가 있어 이것이 디폴트로 CREATE_IF_NEEDED 로 되어 있는 것 같아, 아마 스트리밍 인서트 실행시에 테이블의 존재 확인을 위해 참조하고 있는 모양.

테이블은 기본이며, 이런 움직임은 불필요하므로 이 파라미터를 이하와 같이 CREATE_NEVER 로 해 버린다.

fixed.py
        rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
                                                             dataset=dataset,
                                                             project=bigquery_project,
                                                             create_disposition=BigQueryDisposition.CREATE_NEVER)

import도 잊지 않고.

fixed.py
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition

이제 오류가 발생하지 않습니다! 여러분 조심하세요.

참고



Apache Beam 커뮤니티에도 버그 티켓이 있습니다.

덧붙여서, Stackoverflow에도 같은 고민 쪽이 있었습니다.

좋은 웹페이지 즐겨찾기