AWS Data Wrangler를 사용하여 AWS CloudTrail 데이터 변환

AWS CloudTrail 서비스는 IAM 사용자, IAM 역할, API, SDK 및 기타 AWS 서비스에서 수행한 작업을 캡처합니다. 기본적으로 AWS CloudTrail은 AWS 계정에서 활성화되어 있습니다. 선택한 Amazon S3 버킷에 JSON 형식으로 전달될 진행 중인 이벤트를 기록하는 "트레일"을 생성할 수 있습니다.

CloudTrail 대시보드

트레일 생성

캡처할 이벤트 선택

모든 현재 및 향후 S3 버킷에 대한 읽기-쓰기, 읽기 전용, 쓰기 전용 데이터 이벤트를 기록하도록 추적을 구성할 수 있습니다.



또한 Lambda 함수에 대한 데이터 이벤트를 기록하는 옵션도 있습니다. 모든 지역, 모든 기능을 선택하거나 특정 Lambda ARN 또는 지역을 지정할 수 있습니다.

트레일은 S3 버킷에 작은 크기의 대부분 KB 크기의 gzip 압축된 json 파일을 생성합니다.

S3 버킷의 추적 로그 파일

파일을 선택하고 "다음에서 선택"탭을 사용하여 파일 내용을 볼 수 있습니다.

S3 파일에서 선택

다음은 S3 버킷에 대한 "PutObject"이벤트의 예입니다.
{
    "eventVersion": "1.07",
    "userIdentity": {
        "type": "AWSService",
        "invokedBy": "s3.amazonaws.com"
    },
    "eventTime": "2020-09-12T23:53:22Z",
    "eventSource": "s3.amazonaws.com",
    "eventName": "PutObject",
    "awsRegion": "us-east-1",
    "sourceIPAddress": "s3.amazonaws.com",
    "userAgent": "s3.amazonaws.com",
    "requestParameters": {
        "bucketName": "my-data-bucket",
        "Host": "s3.us-east-1.amazonaws.com",
        "key": "mydatabase/mytable/data-content.snappy.parquet"
    },
    "responseElements": null,
    "additionalEventData": {
        "SignatureVersion": "SigV4",
        "CipherSuite": "ECDHE-RSA-AES128-SHA",
        "bytesTransferredIn": 107886,
        "AuthenticationMethod": "AuthHeader",
        "x-amz-id-2": "Dg9gelyiPojDT00UJ+CI7MmmEyUhPRe1EAUtzQSs3kJAZ8JxMe+2IQ4f6wT2Kpd+Czih1Dc2SI8=",
        "bytesTransferredOut": 0
    },
    "requestID": "29C76F4BC75743BF",
    "eventID": "6973f9b1-1a7d-46d4-a48f-f2d91c80b2d3",
    "readOnly": false,
    "resources": [
        {
            "type": "AWS::S3::Object",
            "ARN": "arn:aws:s3:::my-data-bucket/mydatabase/mytable/data-content.snappy.parquet"
        },
        {
            "accountId": "xxxxxxxxxxxx",
            "type": "AWS::S3::Bucket",
            "ARN": "arn:aws:s3:::my-data-bucket"
        }
    ],
    "eventType": "AwsApiCall",
    "managementEvent": false,
    "recipientAccountId": "xxxxxxxxxxxx",
    "sharedEventID": "eb37214b-623b-43e6-876b-7088c7d0e0ee",
    "vpcEndpointId": "vpce-xxxxxxx",
    "eventCategory": "Data"
}

CloudTrail provides a useful feature under Event history to create Athena table over the trail's Amazon S3 bucket which you can use to query the data using standard SQL.

Now, depending on the duration and events captured, CloudTrail would create lots of small files in S3, which can impact execution time, when queried from Athena.

Moving ahead, I will show you how you can use AWS Data Wrangler and Pandas to perform the following:

  1. Query data from Athena into Pandas dataframe using AWS Data Wrangler.
  2. Transform eventtime string datatype to datetime datatype.
  3. Extract and add year, month, and day columns from eventtime to dataframe.
  4. Write dataframe to S3 in Parquet format with hive partition using AWS Data Wrangler.
  5. Along with writing the dataframe, how you can create the table in Glue catalog using AWS Data Wrangler.

For this example, I have setup a Sagemaker Notebook with Lifecycle configuration. Once you have the notebook open, you can use conda_python3 kernel to work using AWS Data Wrangler.

Import the required libraries

import awswrangler as wr
import pandas as pd
pd.set_option('display.width', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', None)

AWS Data Wrangler를 사용하여 Athena에서 SQL을 실행하는 Python 함수
def execute_sql(sql, database, ctas=False):
    return wr.athena.read_sql_query(sql, database, ctas_approach=ctas)

S3 이벤트와 관련된 세부 정보를 가져오는 SQL 쿼리
s3ObjectSql = """
SELECT 
    useridentity.sessioncontext.sessionissuer.username as username,
    useridentity.sessioncontext.sessionissuer.type as type,
    useridentity.principalid as principalid,
    useridentity.invokedby as invokedby,
    eventname as event_name,
    eventtime,
    eventsource as event_source,
    awsregion as aws_region,
    sourceipaddress,
    eventtype as event_type,
    readonly as read_only,
    requestparameters
FROM cloudtrail_logs_cloudtrail_logs_traillogs
WHERE eventname in ('ListObjects', 'PutObject', 'GetObject') and eventtime > '2020-08-23'
"""

SQL을 실행하고 Pandas 데이터 프레임에서 결과를 얻습니다.
data = execute_sql(sql=s3GObjectSql, database='default')



고유한 사용자 이름 찾기(재미로)
data['username'].value_counts()

관찰하면 eventtime 열이 "문자열"데이터 유형이므로 날짜 변환을 수행하기가 어려울 것입니다. 따라서 여기에서는 datetime 데이터 유형과 drop eventtime으로 새 열을 생성합니다.

eventtime 열에 대한 문자열을 Datetime으로 변환
data['event_time'] = pd.to_datetime(data['eventtime'], errors='coerce')




data.drop('eventtime', axis=1, inplace=True)

event_time 칼럼에서 년, 월, 일 칼럼을 추출하여 추가해 보자. 이 변경으로 데이터를 Hive 파티션으로 S3에 다시 쓸 수 있습니다.

데이터 프레임에 새 필드 추출 및 추가
data['year'] = data['event_time'].dt.year
data['month'] = data['event_time'].dt.month
data['day'] = data['event_time'].dt.day

이제 AWS Data Wrangler s3.to_parquet API를 사용하여 연도, 월, 일 및 parquet 형식으로 분할된 S3에 데이터를 다시 쓸 수 있습니다. 또한 여기에 데이터베이스 및 테이블 파라미터를 추가하여 Athena/Glue 카탈로그에 메타데이터를 작성할 수 있습니다. 성공하려면 명령이 되려면 데이터베이스가 있어야 합니다.
wr.s3.to_parquet(
    df=data,
    path='s3://my-bucket/s3_access_analyzer/cloudtrail/',
    dataset=True,
    partition_cols=['year', 'month', 'day'],
    database='default',  # Athena/Glue database
    table='cloudtrail' # Athena/Glue table
)

Athena에 쿼리하여 결과를 볼 수 있습니다.



0KB의 데이터를 스캔하여 쿼리를 완료하는 데 단 1.74초가 걸렸습니다. 이제 왜 0KB입니까? 글쎄, 나는 당신이 생각하고 대답하도록 남겨 둘 것입니다 :)

결론적으로 AWS Data Wrangler를 사용하면 위와 같이 추출, 변환 및 로드(ETL) 작업을 쉽고 효율적으로 수행할 수 있습니다. 다른 AWS 서비스와 원활하게 통합되며 새로운 기능과 향상된 기능으로 활발하게 업데이트되고 있습니다.

좋은 웹페이지 즐겨찾기