AWS Data Wrangler를 사용하여 AWS CloudTrail 데이터 변환
CloudTrail 대시보드
트레일 생성
캡처할 이벤트 선택
모든 현재 및 향후 S3 버킷에 대한 읽기-쓰기, 읽기 전용, 쓰기 전용 데이터 이벤트를 기록하도록 추적을 구성할 수 있습니다.
또한 Lambda 함수에 대한 데이터 이벤트를 기록하는 옵션도 있습니다. 모든 지역, 모든 기능을 선택하거나 특정 Lambda ARN 또는 지역을 지정할 수 있습니다.
트레일은 S3 버킷에 작은 크기의 대부분 KB 크기의 gzip 압축된 json 파일을 생성합니다.
S3 버킷의 추적 로그 파일
파일을 선택하고 "다음에서 선택"탭을 사용하여 파일 내용을 볼 수 있습니다.
S3 파일에서 선택
다음은 S3 버킷에 대한 "PutObject"이벤트의 예입니다.
{ "eventVersion": "1.07", "userIdentity": { "type": "AWSService", "invokedBy": "s3.amazonaws.com" }, "eventTime": "2020-09-12T23:53:22Z", "eventSource": "s3.amazonaws.com", "eventName": "PutObject", "awsRegion": "us-east-1", "sourceIPAddress": "s3.amazonaws.com", "userAgent": "s3.amazonaws.com", "requestParameters": { "bucketName": "my-data-bucket", "Host": "s3.us-east-1.amazonaws.com", "key": "mydatabase/mytable/data-content.snappy.parquet" }, "responseElements": null, "additionalEventData": { "SignatureVersion": "SigV4", "CipherSuite": "ECDHE-RSA-AES128-SHA", "bytesTransferredIn": 107886, "AuthenticationMethod": "AuthHeader", "x-amz-id-2": "Dg9gelyiPojDT00UJ+CI7MmmEyUhPRe1EAUtzQSs3kJAZ8JxMe+2IQ4f6wT2Kpd+Czih1Dc2SI8=", "bytesTransferredOut": 0 }, "requestID": "29C76F4BC75743BF", "eventID": "6973f9b1-1a7d-46d4-a48f-f2d91c80b2d3", "readOnly": false, "resources": [ { "type": "AWS::S3::Object", "ARN": "arn:aws:s3:::my-data-bucket/mydatabase/mytable/data-content.snappy.parquet" }, { "accountId": "xxxxxxxxxxxx", "type": "AWS::S3::Bucket", "ARN": "arn:aws:s3:::my-data-bucket" } ], "eventType": "AwsApiCall", "managementEvent": false, "recipientAccountId": "xxxxxxxxxxxx", "sharedEventID": "eb37214b-623b-43e6-876b-7088c7d0e0ee", "vpcEndpointId": "vpce-xxxxxxx", "eventCategory": "Data" }
CloudTrail provides a useful feature under Event history to create Athena table over the trail's Amazon S3 bucket which you can use to query the data using standard SQL.
Now, depending on the duration and events captured, CloudTrail would create lots of small files in S3, which can impact execution time, when queried from Athena.
Moving ahead, I will show you how you can use AWS Data Wrangler and Pandas to perform the following:
- Query data from Athena into Pandas dataframe using AWS Data Wrangler.
- Transform eventtime string datatype to datetime datatype.
- Extract and add year, month, and day columns from eventtime to dataframe.
- Write dataframe to S3 in Parquet format with hive partition using AWS Data Wrangler.
- Along with writing the dataframe, how you can create the table in Glue catalog using AWS Data Wrangler.
For this example, I have setup a Sagemaker Notebook with Lifecycle configuration. Once you have the notebook open, you can use conda_python3 kernel to work using AWS Data Wrangler.
Import the required libraries
import awswrangler as wr
import pandas as pd
pd.set_option('display.width', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', None)
AWS Data Wrangler를 사용하여 Athena에서 SQL을 실행하는 Python 함수
def execute_sql(sql, database, ctas=False):
return wr.athena.read_sql_query(sql, database, ctas_approach=ctas)
S3 이벤트와 관련된 세부 정보를 가져오는 SQL 쿼리
s3ObjectSql = """
SELECT
useridentity.sessioncontext.sessionissuer.username as username,
useridentity.sessioncontext.sessionissuer.type as type,
useridentity.principalid as principalid,
useridentity.invokedby as invokedby,
eventname as event_name,
eventtime,
eventsource as event_source,
awsregion as aws_region,
sourceipaddress,
eventtype as event_type,
readonly as read_only,
requestparameters
FROM cloudtrail_logs_cloudtrail_logs_traillogs
WHERE eventname in ('ListObjects', 'PutObject', 'GetObject') and eventtime > '2020-08-23'
"""
SQL을 실행하고 Pandas 데이터 프레임에서 결과를 얻습니다.
data = execute_sql(sql=s3GObjectSql, database='default')
고유한 사용자 이름 찾기(재미로)
data['username'].value_counts()
관찰하면 eventtime 열이 "문자열"데이터 유형이므로 날짜 변환을 수행하기가 어려울 것입니다. 따라서 여기에서는 datetime 데이터 유형과 drop eventtime으로 새 열을 생성합니다.
eventtime 열에 대한 문자열을 Datetime으로 변환
data['event_time'] = pd.to_datetime(data['eventtime'], errors='coerce')
data.drop('eventtime', axis=1, inplace=True)
event_time 칼럼에서 년, 월, 일 칼럼을 추출하여 추가해 보자. 이 변경으로 데이터를 Hive 파티션으로 S3에 다시 쓸 수 있습니다.
데이터 프레임에 새 필드 추출 및 추가
data['year'] = data['event_time'].dt.year
data['month'] = data['event_time'].dt.month
data['day'] = data['event_time'].dt.day
이제 AWS Data Wrangler s3.to_parquet API를 사용하여 연도, 월, 일 및 parquet 형식으로 분할된 S3에 데이터를 다시 쓸 수 있습니다. 또한 여기에 데이터베이스 및 테이블 파라미터를 추가하여 Athena/Glue 카탈로그에 메타데이터를 작성할 수 있습니다. 성공하려면 명령이 되려면 데이터베이스가 있어야 합니다.
wr.s3.to_parquet(
df=data,
path='s3://my-bucket/s3_access_analyzer/cloudtrail/',
dataset=True,
partition_cols=['year', 'month', 'day'],
database='default', # Athena/Glue database
table='cloudtrail' # Athena/Glue table
)
Athena에 쿼리하여 결과를 볼 수 있습니다.
0KB의 데이터를 스캔하여 쿼리를 완료하는 데 단 1.74초가 걸렸습니다. 이제 왜 0KB입니까? 글쎄, 나는 당신이 생각하고 대답하도록 남겨 둘 것입니다 :)
결론적으로 AWS Data Wrangler를 사용하면 위와 같이 추출, 변환 및 로드(ETL) 작업을 쉽고 효율적으로 수행할 수 있습니다. 다른 AWS 서비스와 원활하게 통합되며 새로운 기능과 향상된 기능으로 활발하게 업데이트되고 있습니다.
Reference
이 문제에 관하여(AWS Data Wrangler를 사용하여 AWS CloudTrail 데이터 변환), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anandp86/transform-aws-cloudtrail-data-using-aws-data-wrangler-1cmo텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)