[AWS Glue] 쿼리를 바탕으로 Cloudfront 로그를 Parquet & JST의 ETL (+ 구역 분할) 단계로 나누기
(aws 컨트롤러의 사용 방법 등 세부 부분은 생략)
용례
Athena로 Cloudfront 로그 분석
・Cloudfront 로그 내의 시간은 UTC이기 때문에 JST로 미리 변경하려고 합니다
● Athena에서 전체 스캔이 발생하지 않도록 날짜로 데이터를 분할(분할 가능한 형식으로 변환)
· 과거에 합산된 부분도 다시 ETL로 각 구역을 덮어쓰길 희망한다
단계 예
단계 예제의 전제 조건
・Cloudfront 로그의 테이블 정의 데이터베이스 & 테이블 이름:cloudfrontaccess_logs.app_log
/ETL 이후의 Parquet 출력 목적지:s3:/some-bucket/ETLcloudfront_logs
• Glue Crawler 이름: clfparquet_test
• 저장 방법: overwrite
· ETL의 지침: 가능한 한 Spark SQL로 완성 (로컬 등지에서 ETL 검증이 용이함)
1. Cloudfront의 로그 기능 사용
2. Cloudfront 로그의 테이블 정의 만들기
아래 서술한 내용을 참고하다
https://qiita.com/ytanaka3/items/ad5e7d96bc425ff4c843
3. Glue Crawler를 추가하여 ETL 출력 후 Parquet 데이터를 업데이트하는 테이블 정의
4. 2.ETL+마지막 3.Crawler를 호출하는 Glue Job 만들기
[스크립트]import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudfront_access_logs", table_name = "app_log", transformation_ctx = "datasource0")
## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('tmp')
# クエリベースでETL (ログ内の時間のUTCをJSTに変換 + パーティション分割用にyyyy-MM-ddのカラムを追加)
df_sql = spark.sql(
'''
SELECT
date_format( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST'), 'yyyy-MM-dd') AS `dt`,
unix_timestamp( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST')) AS `timestamp_JST`,
from_utc_timestamp(concat( request_date, " ", request_time ), 'JST') AS `date_JST`,
`x_edge_location`,
`sc_bytes`,
`client_ip`,
`cs_method`,
`cs_host`,
`cs_uri_stem`,
`sc_status`,
`cs_referer`,
`user_agent`,
`uri_query`,
`cookie`,
`x_edge_result_type`,
`x_edge_request_id`,
`x_host_header`,
`cs_protocol`,
`cs_bytes`,
`time_taken`,
`x_forwarded_for`,
`ssl_protocol`,
`ssl_cipher`,
`x_edge_response_result_type`,
`cs_protocol_version`
FROM tmp
WHERE
cs_method != '' or cs_method is NOT NULL
''')
#df_sql.show() #確認用
# 追加したカラムでpartition分割し、overwrite
df_sql.repartition(*["dt"]).write.partitionBy(["dt"]).mode("overwrite").parquet("s3://some-bucket/ETL_cloudfront_logs", compression="gzip")
# Crawlerの開始
aws_glue_client = boto3.client('glue', region_name='us-east-1')
aws_glue_client.start_crawler(Name='clf_parquet_test')
job.commit()
그리고 용도에 따라 S3 기반 로그 저장 기간의 설정, 조회의 가져오기 기간의 지정 등을 진행한다.
Reference
이 문제에 관하여([AWS Glue] 쿼리를 바탕으로 Cloudfront 로그를 Parquet & JST의 ETL (+ 구역 분할) 단계로 나누기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/kkkdev/items/d088fe116201c236784d
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
단계 예제의 전제 조건
・Cloudfront 로그의 테이블 정의 데이터베이스 & 테이블 이름:cloudfrontaccess_logs.app_log
/ETL 이후의 Parquet 출력 목적지:s3:/some-bucket/ETLcloudfront_logs
• Glue Crawler 이름: clfparquet_test
• 저장 방법: overwrite
· ETL의 지침: 가능한 한 Spark SQL로 완성 (로컬 등지에서 ETL 검증이 용이함)
1. Cloudfront의 로그 기능 사용
2. Cloudfront 로그의 테이블 정의 만들기
아래 서술한 내용을 참고하다
https://qiita.com/ytanaka3/items/ad5e7d96bc425ff4c843
3. Glue Crawler를 추가하여 ETL 출력 후 Parquet 데이터를 업데이트하는 테이블 정의
4. 2.ETL+마지막 3.Crawler를 호출하는 Glue Job 만들기
[스크립트]
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudfront_access_logs", table_name = "app_log", transformation_ctx = "datasource0")
## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('tmp')
# クエリベースでETL (ログ内の時間のUTCをJSTに変換 + パーティション分割用にyyyy-MM-ddのカラムを追加)
df_sql = spark.sql(
'''
SELECT
date_format( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST'), 'yyyy-MM-dd') AS `dt`,
unix_timestamp( from_utc_timestamp(concat( request_date, " ", request_time ), 'JST')) AS `timestamp_JST`,
from_utc_timestamp(concat( request_date, " ", request_time ), 'JST') AS `date_JST`,
`x_edge_location`,
`sc_bytes`,
`client_ip`,
`cs_method`,
`cs_host`,
`cs_uri_stem`,
`sc_status`,
`cs_referer`,
`user_agent`,
`uri_query`,
`cookie`,
`x_edge_result_type`,
`x_edge_request_id`,
`x_host_header`,
`cs_protocol`,
`cs_bytes`,
`time_taken`,
`x_forwarded_for`,
`ssl_protocol`,
`ssl_cipher`,
`x_edge_response_result_type`,
`cs_protocol_version`
FROM tmp
WHERE
cs_method != '' or cs_method is NOT NULL
''')
#df_sql.show() #確認用
# 追加したカラムでpartition分割し、overwrite
df_sql.repartition(*["dt"]).write.partitionBy(["dt"]).mode("overwrite").parquet("s3://some-bucket/ETL_cloudfront_logs", compression="gzip")
# Crawlerの開始
aws_glue_client = boto3.client('glue', region_name='us-east-1')
aws_glue_client.start_crawler(Name='clf_parquet_test')
job.commit()
그리고 용도에 따라 S3 기반 로그 저장 기간의 설정, 조회의 가져오기 기간의 지정 등을 진행한다.
Reference
이 문제에 관하여([AWS Glue] 쿼리를 바탕으로 Cloudfront 로그를 Parquet & JST의 ETL (+ 구역 분할) 단계로 나누기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/kkkdev/items/d088fe116201c236784d텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)