GCP 사용료를 슬랙의 with Airflow에 일일 공지
개요
GCP를 사용하지만 클라우드의 파산을 피하고 싶습니다.예산 경보를 통해 알려지기 전에 눈치채고 싶어요...따라서 DAG는 "CloudBilling이 BigQuery에 출력하는 기능"과 "Airflow"를 사용하여 슬랙에 향후 사용료를 알리는 데 사용되는 DAG를 만든다.
하고 싶은 일
절차.
사전 준비: Cloud Billing 데이터를 BigQuery로 내보내기
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T02:00:00Z", "usage_end_time": "2022-04-20T03:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_dataset" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T05:55:41.063Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "933.0", "unit": "bytes", "amount_in_pricing_units": "8.6892396211624146e-07", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T00:00:00Z", "usage_end_time": "2022-04-20T01:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_project" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T04:04:18.373Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "8498.0", "unit": "bytes", "amount_in_pricing_units": "7.9143792390823364e-06", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
{ "billing_account_id": "XXX", "service": { "id": "XXX", "description": "Cloud Logging" }, "sku": { "id": "143F-A1B0-E0BE", "description": "Log Volume" }, "usage_start_time": "2022-04-20T00:00:00Z", "usage_end_time": "2022-04-20T01:00:00Z", "project": { "id": "XXX", "number": "61685520625", "name": "XXX", "labels": [], "ancestry_numbers": null, "ancestors": [{ "resource_name": "XXX", "display_name": "XXX" }] }, "labels": [{ "key": "goog-resource-type", "value": "bigquery_dataset" }], "system_labels": [], "location": { "location": "us", "country": "US", "region": null, "zone": null }, "export_time": "2022-04-20T04:04:18.373Z", "cost": "0.0", "currency": "JPY", "currency_conversion_rate": "122.83500000551589", "usage": { "amount": "4609.0", "unit": "bytes", "amount_in_pricing_units": "4.2924657464027405e-06", "pricing_unit": "gibibyte" }, "credits": [], "invoice": { "month": "202204" }, "cost_type": "regular", "adjustment_info": null}
DAG 전체 차트
プロジェクトA,サービスA,0.0,
プロジェクトA,サービスB,0.0,
プロジェクトA,サービスC,155.30447499999997,
예를 들어 상기와 같은 데이터의 경우 -> 아래와 같은 text를 조립한다.2022-04-14のGCP使用料金
プロジェクトA----
・サービスC ¥156
・サービスA ¥0
・サービスB ¥0
100円以上でした。
설정된 Variables
main.py
import uuid
from datetime import timedelta
from operator import itemgetter
from airflow import DAG
from airflow.models import Variable
from airflow.macros import ds_add
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
from airflow_dag_sample.operators.slack_api_post_operator\
import SlackAPIPostOperatorAPI1
from airflow_dag_sample.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': days_ago(2),
}
def get_extract_date(ti):
return ti.xcom_pull(task_ids='prepare', key='extract_date')
def get_delete_date(ti):
return ti.xcom_pull(task_ids='prepare', key='delete_date')
def _macros():
return {
'bq_table': f"{Variable.get('cost_dataset')}.dataset.{Variable.get('cost_table')}",
'extract_date': get_extract_date,
'delete_date': get_delete_date,
}
with DAG(
'test_slack',
default_args=default_args,
description='日々のGCP使用料をSlackに通知するDAG',
schedule_interval=timedelta(days=1),
tags=["work"],
user_defined_macros=_macros(),
) as dag:
dag.doc_md = """\
## GCPの使用料金をSlackに通知するDAG
### 使用するVariable
- bigquery_sa_keypath
- slack_token_hatiware
- cost_dataset
- cost_table
"""
def prepare(**kwargs):
if kwargs['dag_run'].external_trigger \
and not kwargs['dag_run'].is_backfill:
extract_date = kwargs['yesterday_ds']
delete_date = ds_add(kwargs['yesterday_ds'], -5)
else:
extract_date = kwargs['ds']
delete_date = ds_add(kwargs['ds'], -5)
kwargs['ti'].xcom_push(key='extract_date', value=extract_date)
kwargs['ti'].xcom_push(key='delete_date', value=delete_date)
# CSVの出力先
local_log_dir = '/var/log/bigquery/'
suffix = str(uuid.uuid4()).replace('-', '_')
kwargs['ti'].xcom_push(
key='local_log_filepath',
value=f'{local_log_dir}{extract_date}_{suffix}.csv')
prepare_task = PythonOperator(
task_id="prepare",
python_callable=prepare,
)
with TaskGroup(
"main_task_group", tooltip="bqからextract_dateのgcp利用料金を抽出・加工し、Slackに送る"
) as main_group:
def reader(filepath: str) -> list:
with open(filepath) as f:
for row in f:
yield row.split(',')
def check_total(total_cost: str) -> str:
if total_cost >= 100:
return "100円以上でした。"
else:
return "100円未満でした。"
def create_text(extract_date: str, result_dict: dict) -> str:
text = f"{extract_date}のGCP使用料金"
total_cost = 0
for project, values in result_dict.items():
text = text+f"\n{project}----"
# costが大きい順に並び替えて取り出す
for value in sorted(
values, key=itemgetter('cost'), reverse=True):
cost = int(float(value['cost']))
text = text+f"\n・{value['service']} ¥{cost}"
total_cost += cost
text = text+f"\n\n{check_total(total_cost)}"
return text
def transform(**kwargs):
filepath = kwargs['ti'].xcom_pull(key='local_log_filepath')
result_dict = {}
for row in reader(filepath):
if result_dict.get(row[0], None) is not None:
result_dict[row[0]] = \
list(result_dict[row[0]])\
+ [dict(service=row[1], cost=row[2])]
else:
result_dict[row[0]] = \
[dict(service=row[1], cost=row[2])]
kwargs['ti'].xcom_push(
key='slack_text',
value=create_text(kwargs['extract_date'], result_dict))
extract_task = BigQueryOperator(
task_id='extract',
sql='sql/extract_cost.sql',
filepath="{{ ti.xcom_pull(key='local_log_filepath') }}",
do_xcom_push=False,
)
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"extract_date": '{{ extract_date(ti) }}'},
python_callable=transform,
)
slack_notify_task = SlackAPIPostOperatorAPI1(
task_id="slack_notify",
text="{{ ti.xcom_pull(key='slack_text')}}",
channel="costs"
)
extract_task >> transform_task >> slack_notify_task
with TaskGroup(
"clean_group", tooltip="bqデータの整理") as clean_group:
clean_bq_task = BigQueryOperator(
task_id='clean_bq',
sql="DELETE FROM {{ bq_table }} \
WHERE cast(export_time as date) <= '{{ delete_date(ti) }}'",
do_xcom_push=False,
)
clean_bq_task
prepare_task >> main_group >> clean_group
sql/extract.sql
extract_task에서 사용하는 sql의template
SELECT project.name AS project_name, service.description AS service_name,
SUM(cost) AS total_cost
FROM {{ bq_table }}
WHERE CAST(export_time as date) = '{{ extract_date(ti) }}'
GROUP BY project_name, service_name
BigQueryOperator
BaseOperatorhttps://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py가 확장되었습니다.
from google.cloud import bigquery
from google.oauth2 import service_account
from airflow.models import Variable
from airflow.models import BaseOperator
class BigQueryOperator(BaseOperator):
template_fields = ('sql','filepath')
template_ext = ('.sql')
ui_color = '#db7093'
def __init__(self, sql, filepath=None, *args, **kwargs):
"""
:param sql: execute query. Works with both file paths and raw queries
:param filepath: save the query execution result. if None, does not save to file
"""
super(BigQueryOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.filepath = filepath
def execute(self, *args, **kwargs):
_credentials = service_account.Credentials.from_service_account_file(
Variable.get("bigquery_sa_keypath"),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
_client = bigquery.Client(
credentials=_credentials,
project=_credentials.project_id,
)
query_job = _client.query(self.sql)
result = query_job.result()
if self.filepath is not None:
with open(self.filepath, 'w') as f:
for rows in result:
[f.write(f"{row},") for row in rows]
f.write("\n")
else:
return
SlackAPIPostOperator
SlackAPIoperatorhttps://github.com/apache/airflow/blob/main/airflow/providers/slack/operators/slack.py 제작을 확장합니다.그냥 사용해도 괜찮아요.색깔을 바꾸고 싶고 Operator를 사용할 때 매번 Token을 매개 변수로 설정하는 게 귀찮아서 이렇게 된 것이다.
import json
from airflow.models import Variable
from airflow.operators.slack_operator import SlackAPIOperator
from typing import Any, List, Optional, Sequence
class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to a slack channel
Examples:
https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_modules/airflow/providers/slack/operators/slack.html#SlackAPIPostOperator
"""
template_fields: Sequence[str] = (
'username', 'text', 'attachments', 'blocks', 'channel')
ui_color = '#b6f0dd'
def __init__(
self,
channel: str = '#general',
username: str = '467',
text: str = 'No message has been set.',
icon_url: str = 'https://raw.githubusercontent.com/apache/'
'airflow/main/airflow/www/static/pin_100.png',
attachments: Optional[List] = None,
blocks: Optional[List] = None,
**kwargs,
) -> None:
self.method = 'chat.postMessage'
self.channel = channel
self.username = username
self.text = text
self.icon_url = icon_url
self.attachments = attachments or []
self.blocks = blocks or []
super().__init__(method=self.method, **kwargs)
def construct_api_call_params(self) -> Any:
self.api_params = {
'channel': self.channel,
'username': self.username,
'text': self.text,
'icon_url': self.icon_url,
'attachments': json.dumps(self.attachments),
'blocks': json.dumps(self.blocks),
}
class SlackAPIPostOperatorAPI1(SlackAPIPostOperator):
def __init__(self, **kwargs):
self.token = Variable.get("slack_token_api1")
super().__init__(token=self.token, **kwargs)
실행 결과
실행하고 싶으면 Slack APipost Operatar API1이 지정한 채널에서 다음과 같은 알림이 올 것입니다.
총결산
매일 슬랙에 GCP 사용료를 알리는 DAG를 소개했다.만약 코드의 설치가 적합하지 않거나 좋은 방법이 있다면, 지적해 주신다면 저는 매우 기쁠 것입니다.
Reference
이 문제에 관하여(GCP 사용료를 슬랙의 with Airflow에 일일 공지), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://zenn.dev/467/articles/33ac3de3372d13텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)