[AWS Glue] 쿼리를 바탕으로 Cloudfront 로그를 Parquet & JST의 ETL (+ 구역 분할) 단계로 나누기

7227 단어 AWSsparksqlglue
알림으로 다음과 같은 용례에 대한 대응 절차를 기재한다.
(aws 컨트롤러의 사용 방법 등 세부 부분은 생략)

용례


Athena로 Cloudfront 로그 분석
・Cloudfront 로그 내의 시간은 UTC이기 때문에 JST로 미리 변경하려고 합니다
● Athena에서 전체 스캔이 발생하지 않도록 날짜로 데이터를 분할(분할 가능한 형식으로 변환)
· 과거에 합산된 부분도 다시 ETL로 각 구역을 덮어쓰길 희망한다

단계 예


단계 예제의 전제 조건


・Cloudfront 로그의 테이블 정의 데이터베이스 & 테이블 이름:cloudfrontaccess_logs.app_log
/ETL 이후의 Parquet 출력 목적지:s3:/some-bucket/ETLcloudfront_logs
• Glue Crawler 이름: clfparquet_test
• 저장 방법: overwrite
· ETL의 지침: 가능한 한 Spark SQL로 완성 (로컬 등지에서 ETL 검증이 용이함)

1. Cloudfront의 로그 기능 사용


2. Cloudfront 로그의 테이블 정의 만들기


아래 서술한 내용을 참고하다
https://qiita.com/ytanaka3/items/ad5e7d96bc425ff4c843

3. Glue Crawler를 추가하여 ETL 출력 후 Parquet 데이터를 업데이트하는 테이블 정의



4. 2.ETL+마지막 3.Crawler를 호출하는 Glue Job 만들기


[스크립트]
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudfront_access_logs", table_name = "app_log", transformation_ctx = "datasource0")

## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('tmp')

# クエリベースでETL (ログ内の時間のUTCをJSTに変換 + パーティション分割用にyyyy-MM-ddのカラムを追加)
df_sql = spark.sql(
'''
SELECT
date_format( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST'), 'yyyy-MM-dd') AS `dt`,
unix_timestamp( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST')) AS `timestamp_JST`,
from_utc_timestamp(concat( request_date, " ", request_time ), 'JST') AS `date_JST`,
  `x_edge_location`,
  `sc_bytes`,
  `client_ip`,
  `cs_method`,
  `cs_host`,
  `cs_uri_stem`,
  `sc_status`,
  `cs_referer`,
  `user_agent`,
  `uri_query`,
  `cookie`,
  `x_edge_result_type`,
  `x_edge_request_id`,
  `x_host_header`,
  `cs_protocol`,
  `cs_bytes`,
  `time_taken`,
  `x_forwarded_for`,
  `ssl_protocol`,
  `ssl_cipher`,
  `x_edge_response_result_type`,
  `cs_protocol_version`
FROM tmp
WHERE
cs_method != '' or cs_method is NOT NULL
''')
#df_sql.show() #確認用

# 追加したカラムでpartition分割し、overwrite
df_sql.repartition(*["dt"]).write.partitionBy(["dt"]).mode("overwrite").parquet("s3://some-bucket/ETL_cloudfront_logs", compression="gzip")

# Crawlerの開始
aws_glue_client = boto3.client('glue', region_name='us-east-1')
aws_glue_client.start_crawler(Name='clf_parquet_test')

job.commit()
그리고 용도에 따라 S3 기반 로그 저장 기간의 설정, 조회의 가져오기 기간의 지정 등을 진행한다.

좋은 웹페이지 즐겨찾기