세션이 있는 여러 쿼리에 대한 BigQuery 트랜잭션

BigQuery는 작년부터 트랜잭션을 지원합니다(Google Cloud Next'21에서 발표). 이제 하나 이상의 테이블에 대해 변경 작업을 수행한 다음 사이에 스크립트를 래핑하여 결과를 원자적으로 커밋하거나 롤백할 수 있습니다.

BEGIN TRANSACTION;


그리고

COMMIT TRANSACTION;


또는

ROLLBACK TRANSACTION;


충분히 쉽습니다! 모든 것이 설명됩니다in the official documentation.

그러나 트랜잭션에는 제한이 있습니다.

A transaction cannot span multiple scripts.



이것은 대부분의 경우 문제가 되지 않지만 트랜잭션에 포함된 스크립트가 너무 복잡해지거나 쿼리 매개변수가 너무 많거나 다른 스크립트가 중단되는 경우 문제가 될 수 있습니다quota of BigQuery jobs. 예를 들어 쿼리 스크립트가 요청 페이로드에서 자동 생성될 때 이러한 상황이 발생할 수 있습니다.

BigQuery 세션을 사용하면 해결 방법이 있습니다. 어떻게 작동하는지 봅시다

BigQuery 세션



Sessions은 작업을 연결하고 작업 간에 임시 테이블과 같은 임시 데이터를 유지하는 방법입니다.

세션에 대한 한 가지 일반적인 사용 사례는 정확히 우리가 원하는 것입니다.

Create multi-statement transactions over multiple queries. Within a session, you can begin a transaction, make changes, and view the temporary result before deciding to commit or rollback. You can do this over several queries in the session. If you do not use a session, a multi-statement transaction needs to be completed in a single query.



아이디어는 BEGIN TRANSACTION; 로 시작하여 COMMIT TRANSACTION; 로 끝나는 동일한 세션 내에서 트랜잭션 쿼리를 쌓는 것입니다.
그 사이에 필요한 만큼 많은 쿼리를 호출할 수 있으며 전체 세션은 원자적 동작을 하게 됩니다.

24시간 동안 활동이 없으면 세션이 자동으로 닫힙니다. 그러나 트랜잭션과 혼합되면 대상 테이블이 세션에서 "잠겨"세션이 끝날 때까지 사용할 수 없게 될 수 있습니다. 그렇기 때문에 스크립트 끝에서 세션을 강제 종료하는 것이 좋습니다. 다음 쿼리를 호출하여 수행됩니다.

CALL BQ.ABORT_SESSION();


파이썬 구현



우리는 처리가 끝날 때 열고 항상 닫아야 하는 세션의 개념을 다루고 있습니다. 이를 위해 자연스럽게 컨텍스트 관리자가 표시됩니다.

"""ContextManager wrapping a bigquery session."""
from google.cloud import bigquery


class BigquerySession:
    """ContextManager wrapping a bigquerySession."""

    def __init__(self, bqclient: bigquery.Client, bqlocation: str = "EU") -> None:
        """Construct instance."""
        self._bigquery_client = bqclient
        self._location = bqlocation
        self._session_id = None

    def __enter__(self) -> str:
        """Initiate a Bigquery session and return the session_id."""
        job = self._bigquery_client.query(
            "SELECT 1;",  # a query can't fail
            job_config=bigquery.QueryJobConfig(create_session=True),
            location=self._location,
        )
        self._session_id = job.session_info.session_id
        job.result()  # wait job completion
        return self._session_id

    def __exit__(self, exc_type, exc_value, traceback):
        """Abort the opened session."""
        if self._session_id:
            # abort the session in any case to have a clean state at the end
            # (sometimes in case of script failure, the table is locked in
            # the session)
            job = self._bigquery_client.query(
                "CALL BQ.ABORT_SESSION();",
                job_config=bigquery.QueryJobConfig(
                    create_session=False,
                    connection_properties=[
                        bigquery.query.ConnectionProperty(
                            key="session_id", value=self._session_id
                        )
                    ],
                ),
                location=self._location,
            )
            job.result()



그런 다음 이 컨텍스트를 사용하여 작업을 단일 세션으로 쌓는 것이 정말 쉬워져 다중 문, 다중 스크립트 bigquery 트랜잭션을 생성합니다.

with BigquerySession(self.bigquery_client, BIGQUERY_LOCATION) as session_id:
    # open transaction
    job = self.bigquery_client.query(
        "BEGIN TRANSACTION;",
        job_config=bigquery.QueryJobConfig(
            create_session=False,
            connection_properties=[
                bigquery.query.ConnectionProperty(
                    key="session_id", value=session_id
                )
            ],
        ),
        location=BIGQUERY_LOCATION,
    )
    job.result()
    # stack queries
    for queryscript in scripts:
        job = self.bigquery_client.query(
            queryscript,
            job_config=bigquery.QueryJobConfig(
                create_session=False,
                connection_properties=[
                    bigquery.query.ConnectionProperty(
                        key="session_id", value=session_id
                    )
                ],
            ),
            location=BIGQUERY_LOCATION,
        )
        job.result()
    # end transaction
    job = self.bigquery_client.query(
        "COMMIT TRANSACTION;",
        job_config=bigquery.QueryJobConfig(
            create_session=False,
            connection_properties=[
                bigquery.query.ConnectionProperty(
                    key="session_id", value=session_id
                )
            ],
        ),
        location=BIGQUERY_LOCATION,
    )
    job.result()



모든 작업이 동일한 session_id(즉, 동일한 세션 내) 및 동일한 위치(이는 세션의 요구 사항임)로 어떻게 실행되는지 확인하십시오.

도움이 되었기를 바랍니다 !


Unsplash의 Caroline Selfors 사진

좋은 웹페이지 즐겨찾기