Cloud Dataflow에서 BigQuery에 스트리밍 삽입할 때 약간의 주의
문제
Dataflow에서 Build-in I/O 라이브러리을 사용하여 BigQuery에 스트리밍 인서트로 데이터를 삽입하려고 시도하지 않는 오류가 발생했습니다.
이런 코드
pubsub2bigquery.pyimport argparse
import logging
import json
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class JSONStringToDictFn(beam.DoFn):
def process(self, element):
items = json.loads(element)
yield items
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription',
required=True,
help=(
'Input PubSub subscription '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
parser.add_argument(
'--output_dataset',
required=True,
help=(
'Output BigQuery dataset '
'"<PROJECT>.<DATASET>"'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
subscription = known_args.input_subscription
rides = (
p
| 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
| 'ToDict' >> beam.ParDo(JSONStringToDictFn())
)
(bigquery_project, dataset) = known_args.output_dataset.split('.')
rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
dataset=dataset,
project=bigquery_project)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
이런 프로그램을 다음과 같이 실행하면 에러가 발생.
python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>
수중 환경에서는 자동 스케일링의 다음 옵션을 빼면, 즉 작업자 1에서 실행하면 오류는 발생하지 않습니다.
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
오류
전문은 발췌하지 않지만, 다음과 같은 내용을 포함한 에러가 로그에 나옵니다.
"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base. exceptions.HttpForbiddenError: HttpError accessing htps : // 비 g 쿠에 ry. ㅇㅜㅜㅜㅜ 이 m/비g 쿠에 ry/v2/p 로지ぇcts/슈아-gcp보오 k/다타세 ts/nyc_타ぃ_티p/타bぇs/레아l치메_리로 s? 아 lt = j : response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options ': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '- content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "domain": "usageLimits", "r eason": "rateLimitExceeded"} ], "status": "PERMISSION_DENIED"} } > [while running 'generatedPtransform-121843']
아래쪽에 "code": 403
라든지 "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"
라든지 "reason": "rateLimitExceeded"
라든지 있으므로, API콜의 상한에 걸린 것 같습니다.
음, 스트리밍 인서트에 너무 많이 씁니까? . .
라고 생각했습니다만, 공식 문서의 이하대로, 쿼터는 없을 것.
좀 더 잘 로그를 보면. . .
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(생략)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)
_create_table_if_needed
? get_table
? 분명히 테이블 정보의 취득 다음의 API 조작 걸려 있는 모양. BigQuery에 쓰는 곳의 apache_beam.io.gcp.bigquery.WriteToBigQuery
로 create_disposition
파라미터가 있어 이것이 디폴트로 CREATE_IF_NEEDED
로 되어 있는 것 같아, 아마 스트리밍 인서트 실행시에 테이블의 존재 확인을 위해 참조하고 있는 모양.
테이블은 기본이며, 이런 움직임은 불필요하므로 이 파라미터를 이하와 같이 CREATE_NEVER
로 해 버린다.
fixed.py rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
dataset=dataset,
project=bigquery_project,
create_disposition=BigQueryDisposition.CREATE_NEVER)
import도 잊지 않고.
fixed.pyfrom apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
이제 오류가 발생하지 않습니다! 여러분 조심하세요.
참고
Apache Beam 커뮤니티에도 버그 티켓이 있습니다.
덧붙여서, Stackoverflow에도 같은 고민 쪽이 있었습니다.
Reference
이 문제에 관하여(Cloud Dataflow에서 BigQuery에 스트리밍 삽입할 때 약간의 주의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/ShuA/items/1b2ffed37a024c3dae07
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
pubsub2bigquery.py
import argparse
import logging
import json
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class JSONStringToDictFn(beam.DoFn):
def process(self, element):
items = json.loads(element)
yield items
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription',
required=True,
help=(
'Input PubSub subscription '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
parser.add_argument(
'--output_dataset',
required=True,
help=(
'Output BigQuery dataset '
'"<PROJECT>.<DATASET>"'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
subscription = known_args.input_subscription
rides = (
p
| 'Read' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
| 'ToDict' >> beam.ParDo(JSONStringToDictFn())
)
(bigquery_project, dataset) = known_args.output_dataset.split('.')
rides | 'Write rides to BigQuery' >> WriteToBigQuery(table,
dataset=dataset,
project=bigquery_project)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
이런 프로그램을 다음과 같이 실행하면 에러가 발생.
python pubsub2bigquery.py \
--project <PROJECT ID> \
--region='us-central1' \
--runner DataflowRunner \
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
--input_subscription <SUBSCRIPTION> \
--output_dataset <DATASET ID> \
--temp_location=<GCP PATH for temp> \
--staging_location=<GCP PATH for staging>
수중 환경에서는 자동 스케일링의 다음 옵션을 빼면, 즉 작업자 1에서 실행하면 오류는 발생하지 않습니다.
--autoscaling_algorithm=THROUGHPUT_BASED \
--max_num_workers=50 \
오류
전문은 발췌하지 않지만, 다음과 같은 내용을 포함한 에러가 로그에 나옵니다.
"/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse http_response, method_config=method_config, request=request) RuntimeError: apitools.base. exceptions.HttpForbiddenError: HttpError accessing htps : // 비 g 쿠에 ry. ㅇㅜㅜㅜㅜ 이 m/비g 쿠에 ry/v2/p 로지ぇcts/슈아-gcp보오 k/다타세 ts/nyc_타ぃ_티p/타bぇs/레아l치메_리로 s? 아 lt = j : response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Fri, 28 Aug 2020 03:40:42 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options ': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '- content-encoding': 'gzip'}>, content <{ "error": { "code": 403, "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "errors": [ { "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see htps : // c ぉ d. 오, ぇ. 코 m / 비 g 쿠에 ry / t 로 b ", "domain": "usageLimits", "r eason": "rateLimitExceeded"} ], "status": "PERMISSION_DENIED"} } > [while running 'generatedPtransform-121843']
아래쪽에
"code": 403
라든지 "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"
라든지 "reason": "rateLimitExceeded"
라든지 있으므로, API콜의 상한에 걸린 것 같습니다.음, 스트리밍 인서트에 너무 많이 씁니까? . .
라고 생각했습니다만, 공식 문서의 이하대로, 쿼터는 없을 것.
좀 더 잘 로그를 보면. . .
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1126, in process bigquery_tools.parse_table_reference(destination), schema) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1112, in _create_table_if_needed additional_create_parameters=self.additional_bq_parameters)
(생략)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 534, in get_table response = self.client.tables.Get(request)
_create_table_if_needed
? get_table
? 분명히 테이블 정보의 취득 다음의 API 조작 걸려 있는 모양. BigQuery에 쓰는 곳의 apache_beam.io.gcp.bigquery.WriteToBigQuery
로 create_disposition
파라미터가 있어 이것이 디폴트로 CREATE_IF_NEEDED
로 되어 있는 것 같아, 아마 스트리밍 인서트 실행시에 테이블의 존재 확인을 위해 참조하고 있는 모양.테이블은 기본이며, 이런 움직임은 불필요하므로 이 파라미터를 이하와 같이
CREATE_NEVER
로 해 버린다.fixed.py
rides | 'Write rides to BigQuery' >> WriteToBigQuery('realtime_rides',
dataset=dataset,
project=bigquery_project,
create_disposition=BigQueryDisposition.CREATE_NEVER)
import도 잊지 않고.
fixed.py
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
이제 오류가 발생하지 않습니다! 여러분 조심하세요.
참고
Apache Beam 커뮤니티에도 버그 티켓이 있습니다.
덧붙여서, Stackoverflow에도 같은 고민 쪽이 있었습니다.
Reference
이 문제에 관하여(Cloud Dataflow에서 BigQuery에 스트리밍 삽입할 때 약간의 주의), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/ShuA/items/1b2ffed37a024c3dae07텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)