[Airflow] 파이톤 Operator를 통해 BigQuery에 연결

11498 단어 BigQueryairflow

개시하다


CloudComposier를 사용하지 않고 GCE 내 Docker에서 Airflow를 활용합니다.
에어플로우로 빅큐리를 제어해야 하기 때문에 연결이 조금 고생스러우니 미리 적어두세요.
BigQuery Operator로 제어하는 방법은 많지만
Pythhone Operator에서는 구글 SDK를 이용해 빅큐리를 제어하는 방법이 거의 없다.
(각자 Operator로 컨트롤해야 한다지만 파이톤으로 자유롭게 만들고 싶다.)

TL;DR


<잘못된 내용>

client = bigquery.Client()
tables = client.list_tables(dataset_id)
↓ 오류 출력
google.api_core.exceptions.Forbidden: 403 GET https://bigquery.googleapis.com/bigquery/v2/projects/[project]/datasets/[dataset]/tables?prettyPrint=false: Request had insufficient authentication scopes.

<포함된 이유>


서비스 계정을 불러오지 않았을 뿐입니다.

<제거 방법>


구현서비스 계정 키 파일로 인증에 기재된 내용만 있으면 됩니다.

Detail


1. 서비스 계정 키 게시


서비스 계좌 열쇠를 발행하다.
발행 순서는 다른 사이트를 참조하세요.

2. 내보낸 json을 디렉토리에 구성합니다.


실례적인 docker 링크 디렉터리에 json 파일을 설정합니다.
링크된 디렉토리는 docker-compose.yaml의volumes입니다.
Docker 내 Airflow에 대한 링크 디렉토리는 설정에 따라 다릅니다.나는 cfg에 기록이 있다고 생각한다.
cfg를 읽는 것이 번거롭고 길다고 생각되면 실제로 docker에 들어가서 확인하는 것이 좋습니다.
docker ps
・・・ Docker名とかが出てくる
docker exec -it [web_serverのDocker名] bash
초기 설치 방법은 타방면의 보도를 참조하십시오.

3. 코드를 실제로 써 본다


다만 일반적으로 참고서비스 계정 키 파일로 인증했지만, 실제 코드를 미리 마스크 형태로 게재했다.
oO(테스트 코드도 자신)
내용은 데이터 집합의 표 일람의 출력이다.
인증 부분이credentials로 설정된 서비스 -account, 키.json을 호출하여 설정합니다.
그 내용을bigquery.Client 매개변수를 사용하여 전달합니다.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta, timezone
import pendulum

from google.oauth2 import service_account
from google.cloud import bigquery


JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)).strftime('%Y%m%d')
project = "projectName"
detaset = "datasetName"
credentials = service_account.Credentials.from_service_account_file(
    "/opt/airflow/credential/key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],
)


def start(**kwargs):
    print("start job")
    print(batch_date)


def getTables(dataset_id):
    datasetId = project + "." + detaset
    print(datasetId)  # projectName.datasetName
    client = bigquery.Client(credentials=credentials,
                             project=credentials.project_id,)
    tables = client.list_tables(datasetId)

    for table in tables:
        print("{}.{}.{}".format(table.project, table.dataset_id, table.table_id))
        ## projectName.datasetName.tableName


with DAG(
    'viewTables',
    catchup=False,
    start_date=datetime(2021, 4, 25, 00, 00,
                        tzinfo=pendulum.timezone('Asia/Tokyo')),
    schedule_interval=timedelta(minutes=60),
    default_args={'owner': 'airflow'},
    tags=['tag'],
) as dag:

    startTask = PythonOperator(
        task_id='start',
        provide_context=True,
        python_callable=start,
    )

    getTablesTask = PythonOperator(
        task_id='get_tables',
        provide_context=True,
        python_callable=getTables,
    )

startTask >> getTablesTask

4. BigQuery Operater에서도 사용 가능


파이썬 Operator와 관련이 없지만 BigQuery Operater를 사용하는 경우에도 키 정보가 필요합니다.
Cloud Composter는 자동으로 이 근처에 있을 수 있습니까?스스로 일어설 경우 UI에서 다음을 설정해야 합니다.
bigquery_default는 변경할 수 있습니다. Cloud Composier의 표준 이름입니까?



이렇게 사용합니다.
t2 = BigQueryOperator(
    task_id='copy_table',
    sql="""
    select * from `[source_projectName].[source_datasetName].[source_tableName]`
    """,
    destination_dataset_table='dest_projectName.dest_databaseName.dest_tableName',
    use_legacy_sql=False,
    bigquery_conn_id='bigquery_default',
    dag=dag,
)

최후


에어플로우는 많은 곳에서 고생한다.
Cloud Composier 관리자라면 이 근처가 해제될까요??

좋은 웹페이지 즐겨찾기