GCP 사용료를 슬랙의 with Airflow에 일일 공지

51758 단어 Airflowtech

개요


GCP를 사용하지만 클라우드의 파산을 피하고 싶습니다.예산 경보를 통해 알려지기 전에 눈치채고 싶어요...따라서 DAG는 "CloudBilling이 BigQuery에 출력하는 기능"과 "Airflow"를 사용하여 슬랙에 향후 사용료를 알리는 데 사용되는 DAG를 만든다.

하고 싶은 일

  • 다음날 슬랙에 이용료 공지
  • 이때 프로젝트와 서비스 명칭의 가격도 알 수 있다.
  • BigQuery에서 사용된 데이터(알림된 데이터) 정리
  • 공지된 데이터를 CSV로 로컬로 저장합니다.
  • 또 데이터 양을 줄이기 위해 6일 전의 데이터를 삭제했다.
  • 절차.


    사전 준비: Cloud Billing 데이터를 BigQuery로 내보내기

  • 여기서 정리한 대로 진행했습니다.
  • https://cloud.google.com/billing/docs/how-to/export-data-bigquery?hl=ja
  • 는'표준적인 사용료 데이터'를 사용했다.
  • 다양한 설정은 BigQuery 내에 데이터 set을 만들고 다음 데이터를 생성합니다.
  • 각 열에 대해
  • https://cloud.google.com/billing/docs/how-to/export-data-bigquery-tables?hl=ja#standard-usage-cost-data-schema
  • 신중을 기하기 위해 숨기는 것이 가장 좋은value는'XXX'다.
  • {  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T02:00:00Z",  "usage_end_time": "2022-04-20T03:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_dataset"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T05:55:41.063Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "933.0",    "unit": "bytes",    "amount_in_pricing_units": "8.6892396211624146e-07",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
    {  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T00:00:00Z",  "usage_end_time": "2022-04-20T01:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_project"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T04:04:18.373Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "8498.0",    "unit": "bytes",    "amount_in_pricing_units": "7.9143792390823364e-06",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
    {  "billing_account_id": "XXX",  "service": {    "id": "XXX",    "description": "Cloud Logging"  },  "sku": {    "id": "143F-A1B0-E0BE",    "description": "Log Volume"  },  "usage_start_time": "2022-04-20T00:00:00Z",  "usage_end_time": "2022-04-20T01:00:00Z",  "project": {    "id": "XXX",    "number": "61685520625",    "name": "XXX",    "labels": [],    "ancestry_numbers": null,    "ancestors": [{      "resource_name": "XXX",      "display_name": "XXX"    }]  },  "labels": [{    "key": "goog-resource-type",    "value": "bigquery_dataset"  }],  "system_labels": [],  "location": {    "location": "us",    "country": "US",    "region": null,    "zone": null  },  "export_time": "2022-04-20T04:04:18.373Z",  "cost": "0.0",  "currency": "JPY",  "currency_conversion_rate": "122.83500000551589",  "usage": {    "amount": "4609.0",    "unit": "bytes",    "amount_in_pricing_units": "4.2924657464027405e-06",    "pricing_unit": "gibibyte"  },  "credits": [],  "invoice": {    "month": "202204"  },  "cost_type": "regular",  "adjustment_info": null}
    
  • 출력된 데이터에서'.export time'에서 임의의 날짜의 식별을 프로젝트 이름,'.project.name'의 서비스 이름,'.service.description'의 서비스 이름,'.cost'의 프로젝트의 모든 서비스 비용의 식별으로 사용한다.
  • DAG 전체 차트


  • prepare task
  • 각종 준비된 작업(수동 실행, 자동 실행 시 얻은 데이터 날짜의 분할, Xcom의push 등)에 사용
  • main_task_group group
  • extract: sql을 사용하여 BigQuery에서 CSV로 데이터를 출력합니다.
  • transform:extract를 통해 출력된 CSV 데이터를 가공하여 슬랙에 알릴 수 있는 text를 조립합니다.
  • プロジェクトA,サービスA,0.0,
    プロジェクトA,サービスB,0.0,
    プロジェクトA,サービスC,155.30447499999997,
    
    예를 들어 상기와 같은 데이터의 경우 -> 아래와 같은 text를 조립한다.
    2022-04-14のGCP使用料金
    プロジェクトA----
    ・サービスC ¥156
    ・サービスA ¥0
    ・サービスB ¥0
    100円以上でした。
    
  • slack_슬랙에게 알립니다.
  • clean_group group
  • clean_6일 전의 데이터를 삭제합니다.
  • 설정된 Variables

  • bigquery_sa_keyppath: Biqquery 서비스 계정이 있는 키의 path
  • slack_token_API1: slackapi의 token
  • cost_데이터 소스: bq 데이터 소스
  • cost_table:bq의 table
  • main.py


    import uuid
    from datetime import timedelta
    from operator import itemgetter
    from airflow import DAG
    from airflow.models import Variable
    from airflow.macros import ds_add
    from airflow.utils.dates import days_ago
    from airflow.utils.task_group import TaskGroup
    from airflow.operators.python_operator import PythonOperator
    from airflow_dag_sample.operators.slack_api_post_operator\
        import SlackAPIPostOperatorAPI1
    from airflow_dag_sample.operators.bigquery_operator import BigQueryOperator
    
    
    default_args = {
        'owner': 'owner',
        'depends_on_past': False,
        'start_date': days_ago(2),
    }
    
    
    def get_extract_date(ti):
        return ti.xcom_pull(task_ids='prepare', key='extract_date')
    
    
    def get_delete_date(ti):
        return ti.xcom_pull(task_ids='prepare', key='delete_date')
    
    
    def _macros():
        return {
            'bq_table': f"{Variable.get('cost_dataset')}.dataset.{Variable.get('cost_table')}",
            'extract_date': get_extract_date,
            'delete_date': get_delete_date,
        }
    
    
    with DAG(
        'test_slack',
        default_args=default_args,
        description='日々のGCP使用料をSlackに通知するDAG',
        schedule_interval=timedelta(days=1),
        tags=["work"],
        user_defined_macros=_macros(),
    ) as dag:
        dag.doc_md = """\
        ## GCPの使用料金をSlackに通知するDAG
        ### 使用するVariable
        - bigquery_sa_keypath
        - slack_token_hatiware
        - cost_dataset
        - cost_table
        """
    
        def prepare(**kwargs):
            if kwargs['dag_run'].external_trigger \
                 and not kwargs['dag_run'].is_backfill:
                extract_date = kwargs['yesterday_ds']
                delete_date = ds_add(kwargs['yesterday_ds'], -5)
            else:
                extract_date = kwargs['ds']
                delete_date = ds_add(kwargs['ds'], -5)
            kwargs['ti'].xcom_push(key='extract_date', value=extract_date)
            kwargs['ti'].xcom_push(key='delete_date', value=delete_date)
    	# CSVの出力先
            local_log_dir = '/var/log/bigquery/'
            suffix = str(uuid.uuid4()).replace('-', '_')
            kwargs['ti'].xcom_push(
                key='local_log_filepath',
                value=f'{local_log_dir}{extract_date}_{suffix}.csv')
    
        prepare_task = PythonOperator(
            task_id="prepare",
            python_callable=prepare,
        )
    
        with TaskGroup(
             "main_task_group", tooltip="bqからextract_dateのgcp利用料金を抽出・加工し、Slackに送る"
             ) as main_group:
            def reader(filepath: str) -> list:
                with open(filepath) as f:
                    for row in f:
                        yield row.split(',')
    
            def check_total(total_cost: str) -> str:
                if total_cost >= 100:
                    return "100円以上でした。"
                else:
                    return "100円未満でした。"
    
            def create_text(extract_date: str, result_dict: dict) -> str:
                text = f"{extract_date}のGCP使用料金"
                total_cost = 0
                for project, values in result_dict.items():
                    text = text+f"\n{project}----"
    		# costが大きい順に並び替えて取り出す
                    for value in sorted(
                            values, key=itemgetter('cost'), reverse=True):
                        cost = int(float(value['cost']))
                        text = text+f"\n・{value['service']} ¥{cost}"
                        total_cost += cost
                text = text+f"\n\n{check_total(total_cost)}"
                return text
    
            def transform(**kwargs):
                filepath = kwargs['ti'].xcom_pull(key='local_log_filepath')
                result_dict = {}
                for row in reader(filepath):
                    if result_dict.get(row[0], None) is not None:
                        result_dict[row[0]] = \
                            list(result_dict[row[0]])\
                            + [dict(service=row[1], cost=row[2])]
                    else:
                        result_dict[row[0]] = \
                            [dict(service=row[1], cost=row[2])]
                kwargs['ti'].xcom_push(
                    key='slack_text',
                    value=create_text(kwargs['extract_date'], result_dict))
    
            extract_task = BigQueryOperator(
                task_id='extract',
                sql='sql/extract_cost.sql',
                filepath="{{ ti.xcom_pull(key='local_log_filepath') }}",
                do_xcom_push=False,
            )
    
            transform_task = PythonOperator(
                task_id="transform",
                op_kwargs={"extract_date": '{{ extract_date(ti) }}'},
                python_callable=transform,
            )
    
            slack_notify_task = SlackAPIPostOperatorAPI1(
                task_id="slack_notify",
                text="{{ ti.xcom_pull(key='slack_text')}}",
                channel="costs"
            )
    
            extract_task >> transform_task >> slack_notify_task
    
        with TaskGroup(
             "clean_group", tooltip="bqデータの整理") as clean_group:
    
            clean_bq_task = BigQueryOperator(
                task_id='clean_bq',
                sql="DELETE FROM {{ bq_table }} \
                     WHERE cast(export_time as date) <= '{{ delete_date(ti) }}'",
                do_xcom_push=False,
            )
    
            clean_bq_task
    
        prepare_task >> main_group >> clean_group
    

    sql/extract.sql


    extract_task에서 사용하는 sql의template
    SELECT project.name AS project_name, service.description AS service_name,
     SUM(cost) AS total_cost
    FROM {{ bq_table }} 
    WHERE CAST(export_time as date) = '{{ extract_date(ti) }}'
    GROUP BY project_name, service_name
    

    BigQueryOperator


    BaseOperatorhttps://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py가 확장되었습니다.
    from google.cloud import bigquery
    from google.oauth2 import service_account
    from airflow.models import Variable
    from airflow.models import BaseOperator
    
    class BigQueryOperator(BaseOperator):
    
        template_fields = ('sql','filepath')
        template_ext = ('.sql')
        ui_color = '#db7093'
    
        def __init__(self, sql, filepath=None, *args, **kwargs):
            """ 
    	:param sql: execute query. Works with both file paths and raw queries
    	:param filepath: save the query execution result. if None, does not save to file
    	"""
            super(BigQueryOperator, self).__init__(*args, **kwargs)
            self.sql = sql
            self.filepath = filepath
    
    
        def execute(self, *args, **kwargs):
            _credentials = service_account.Credentials.from_service_account_file(
                Variable.get("bigquery_sa_keypath"),
                scopes=["https://www.googleapis.com/auth/cloud-platform"],
            )
            _client = bigquery.Client(
                credentials=_credentials,
                project=_credentials.project_id,
            )
            query_job = _client.query(self.sql)
            result = query_job.result()
    
            if self.filepath is not None:
                with open(self.filepath, 'w') as f:
                    for rows in result:
                        [f.write(f"{row},") for row in rows]
                        f.write("\n")
            else:
                return
    

    SlackAPIPostOperator


    SlackAPIoperatorhttps://github.com/apache/airflow/blob/main/airflow/providers/slack/operators/slack.py 제작을 확장합니다.그냥 사용해도 괜찮아요.색깔을 바꾸고 싶고 Operator를 사용할 때 매번 Token을 매개 변수로 설정하는 게 귀찮아서 이렇게 된 것이다.
    import json
    from airflow.models import Variable
    from airflow.operators.slack_operator import SlackAPIOperator
    from typing import Any, List, Optional, Sequence
    
    
    class SlackAPIPostOperator(SlackAPIOperator):
        """
        Posts messages to a slack channel
        Examples:
        https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_modules/airflow/providers/slack/operators/slack.html#SlackAPIPostOperator
        """
        template_fields: Sequence[str] = (
            'username', 'text', 'attachments', 'blocks', 'channel')
    
        ui_color = '#b6f0dd'
    
        def __init__(
            self,
            channel: str = '#general',
            username: str = '467',
            text: str = 'No message has been set.',
            icon_url: str = 'https://raw.githubusercontent.com/apache/'
            'airflow/main/airflow/www/static/pin_100.png',
            attachments: Optional[List] = None,
            blocks: Optional[List] = None,
            **kwargs,
        ) -> None:
            self.method = 'chat.postMessage'
            self.channel = channel
            self.username = username
            self.text = text
            self.icon_url = icon_url
            self.attachments = attachments or []
            self.blocks = blocks or []
            super().__init__(method=self.method, **kwargs)
    
        def construct_api_call_params(self) -> Any:
            self.api_params = {
                'channel': self.channel,
                'username': self.username,
                'text': self.text,
                'icon_url': self.icon_url,
                'attachments': json.dumps(self.attachments),
                'blocks': json.dumps(self.blocks),
    
            }
    
    
    class SlackAPIPostOperatorAPI1(SlackAPIPostOperator):
        def __init__(self, **kwargs):
            self.token = Variable.get("slack_token_api1")
            super().__init__(token=self.token, **kwargs)
    

    실행 결과


    실행하고 싶으면 Slack APipost Operatar API1이 지정한 채널에서 다음과 같은 알림이 올 것입니다.

    총결산


    매일 슬랙에 GCP 사용료를 알리는 DAG를 소개했다.만약 코드의 설치가 적합하지 않거나 좋은 방법이 있다면, 지적해 주신다면 저는 매우 기쁠 것입니다.

    좋은 웹페이지 즐겨찾기