APIGateway 로그 타임스탬프를 Lambda로 가공하여 Elasticsearch Service로 보내기

소개



APIGateway 로그를 Elasticsearch Service로 보내 Kibana에서 시각화하려고 할 때의 단계.
다만, kibana로 대응하고 있는 타임스탬프의 형식이 APIGateway가 출력하는 타임스탬프의 형태와 일치하고 있지 않았다.
그 때문에, Lambda로 타임 스탬프의 형식을 변환해 Kibana에서 타임 스탬프로서 취급할 수 있도록(듯이) 했다.

구성



APIGateway의 액세스 로그는 Kinesis에만 출력할 수 있기 때문에 Kinesis를 이용하고 있다.


절차



1.Lambda 만들기
2. Kinesis Data Firehose 만들기
3.APIGateway에서 액세스 로그를 출력하는 설정 추가
※Elasticsearch Service의 도메인은 작성 완료의 전제

1.Lambda 만들기



Lamnda 환경 변수에 로그 레벨로 "LOG_LEVEL"을 설정해야합니다.
아래는 로그 레벨이 디버그인 경우


타임 스탬프의 키는 APIGateway에서 설정한 "requestTime".

lambda_function.py
import json
import base64
import logging
from os import environ
from datetime import datetime as dt
from datetime import timedelta, timezone as tz

# 環境変数ログレベルのキー
LOG_LEVEL = 'LOG_LEVEL'
# タイムスタンプのキー
REQUEST_TIME_KEY = 'requestTime'

# ログオブジェクト作成
def get_logger(workername):
    # ログの出力名を設定
    logger = logging.getLogger(workername)
    # ログレベルの設定
    logger.setLevel(int(environ[LOG_LEVEL]))

    return logger

# ロガー
logger = get_logger(__name__)

# タイムスタンプの形式をKibana用に変換
def conv_timestamp(timestamp):
    # date型に変換
    date = dt.strptime(timestamp, '%d/%b/%Y:%H:%M:%S +0000')
    # 9時間進める(力技なのでイマイチ・・・)
    date = date + timedelta(hours=+9)
    # String型に変換して返却
    return dt.strftime(date, '%Y-%m-%dT%H:%M:%S+0900')

def lambda_handler(event, context):
    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼event▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(event)
    output_list = []
    for record in event['records']:
        data = record['data']
        # デコード
        data_decode = base64.b64decode(data)
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換前)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換前)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

        # byte型をString型に変換
        data_str = data_decode.decode("ascii")
        # Json文字列を辞書型に変換
        data = json.loads(data_str)

        # タイムスタンプの形式をkibana用に変換
        data[REQUEST_TIME_KEY] = conv_timestamp(data[REQUEST_TIME_KEY])

        # 辞書型をjson文字列→base64エンコード→デコード
        record['data'] = base64.b64encode(json.dumps(data).encode('ascii')).decode("ascii")
        # resultを追加
        record['result'] = 'Ok'
        output_list.append(record)

        data_decode = base64.b64decode(record['data'])
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換後)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換後)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼返却するLIST▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(output_list)
    logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲返却するLIST▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    return {'records': output_list}

Kinesis로 데이터를 반환하기 위해 AWS 관리형 정책에 'AWSLambdaKinesisExecutionRole' 추가

2. Kinesis Data Firehose 만들기



'Choose a Souce'는 'Direct PUT or other sources'를 선택한다.


"record transformation"은 "Enabled"를 선택하고 작성한 가공용 Lambda를 선택한다.


Amazon Elasticsearch Service destination의 도메인에서 생성된 도메인을 선택합니다.
「Index」는 임의의 이름.
「Index rotation」, 일별로 하는 경우는, 「Every day」를 선택한다.


또한 Elasticsearch Service에 로그를 보내고 Lambda를 실행하기 위해 아래의 권한을 정책으로 추가한다.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:GetFunctionConfiguration"
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

이상의 설정을 하여 작성한다.

3.APIGateway에서 액세스 로그를 출력하는 설정 추가



액세스 로그를 출력하는 API 스테이지 > 로그 추적 탭
Access Log Destination ARN: 1.에서 만든 Kinesis의 ARN
로그 형식: JSON 버튼을 눌러 설정



이것으로 구성의 그림대로 완성입니다.

마지막으로



Kinesis의 서비스를 이번에 처음 만졌습니다.
어려운 것이라고 생각했지만 데이터를 모아주는 간단한 서비스라고 이해할 수 있었습니다.
다만 도쿄 리전에서도 모두 영어이므로, 초학자의 허들을 조금 올리고 있다고 생각했습니다.

좋은 웹페이지 즐겨찾기