CloudFront 액세스 로그를 Elasticsearch Service 6.8에 포함하는 메모(Lambda Python 3.8)

ALB 마찬가지로 CloudFront 로그 캡처에 대해서도 Elasticsearch Service 6.8/Lambda Python 3.8 지원 메모를 남겨 둡니다.

이 기사의 업데이트입니다.
  • CloudFront 액세스 로그를 Elasticsearch Service(6.0/6.2)로 가져오기

  • ※ 거의 ALB 과 같은 순서입니다.

    절차



    1. Amazon Elasticsearch Service 시작



    ALB의 전 기사 와 같습니다 (생략).

    2. Ingest Pipeline 설정



    CloudFront 원본 기사 과 같습니다만 재게재합니다.

    설정(CloudFront용)
    $ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/cflog' -d '{
      "processors": [{
        "grok": {
          "field": "message",
          "patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:x_edge_location} (?:%{INT:sc_bytes:int}|-) %{IPORHOST:c_ip} %{WORD:cs_method} %{HOSTNAME:cs_host} %{NOTSPACE:cs_uri_stem} %{INT:sc_status:int} %{GREEDYDATA:referrer} %{GREEDYDATA:User_Agent} %{GREEDYDATA:cs_uri_query} %{GREEDYDATA:cookies} %{WORD:x_edge_result_type} %{NOTSPACE:x_edge_request_id} %{HOSTNAME:x_host_header} %{URIPROTO:cs_protocol} (?:%{INT:cs_bytes:int}|-) %{NUMBER:time_taken:float} %{NOTSPACE:x_forwarded_for} %{NOTSPACE:ssl_protocol} %{NOTSPACE:ssl_cipher} %{WORD:x_edge_response_result_type}" ],
          "ignore_missing": true
        }
      },{
        "remove":{
          "field": "message"
        }
      }, {
        "user_agent": {
          "field": "User_Agent",
          "target_field": "user_agent",
          "ignore_failure": true
        }
      }, {
        "remove": {
          "field": "User_Agent"
        }
      }]
    }'
    

    3. AWS Lambda 함수 생성



    IAM Role 만들기



    ALB의 전 기사 와 같습니다 (생략).

    Lambda 함수 만들기


  • .zip 파일 생성(Amazon Linux 2에서)

  • Lambda용 업로드 파일.zip화
    $ python3 -m venv dev
    $ . dev/bin/activate
    (dev) $ mkdir cf_log_to_es_s3
    (dev) $ cd cf_log_to_es_s3/
    (dev) $ pip install requests requests_aws4auth -t ./
    (省略)
    (dev) $ rm -rf *.dist-info
    (dev) $ vi lambda_function.py
    (ここで「lambda_function.py」のコードを入力)
    (dev) $ zip -r ../cflog.zip *
    
  • 코드(Python 3.8용)

  • lambda_function.py
    import boto3
    import os
    import gzip
    import requests
    from datetime import datetime
    from requests_aws4auth import AWS4Auth
    
    def lambda_handler(event, context):
        print('Started')
        es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
        region = os.environ["AWS_REGION"]
        service = 'es'
        credentials = boto3.Session().get_credentials()
        awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
    
        bucket = event["Records"][0]["s3"]["bucket"]["name"]
        key = event["Records"][0]["s3"]["object"]["key"]
    
        s3 = boto3.resource('s3')
        s3.Bucket(bucket).download_file(key, '/tmp/log.gz')
    
        with gzip.open('/tmp/log.gz', mode='rt') as f:
            data = ""
    
            for line in f:
                if not line.strip().startswith('#'):
                    data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
                    data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"').replace('\t', ' ').replace(' ', 'T', 1).replace(' ', 'Z ', 1)
    
                    if len(data) > 3000000:
                        _bulk(data, awsauth)
                        data = ""
    
            if data != "":
                _bulk(data, awsauth)
    
        return 'Completed'
    
    def _bulk(data, awsauth):    
        es_host = os.environ['ES_HOST']
        pipeline = os.environ['PIPELINE_NAME']
        url = 'https://%s/_bulk?pipeline=%s' % (es_host, pipeline)
    
        headers = {'Content-Type': 'application/json'}
        response = request(url, awsauth, headers=headers, data=data)
    
        if response.status_code != requests.codes.ok:
            print(response.text)
    
    def request(url, awsauth, headers=None, data=None):
        return requests.post(url, auth=awsauth, headers=headers, data=data)
    
  • 환경 변수

  • CloudFront 원본 기사 과 같습니다만 재게재합니다.


  • 액세스 로그가 저장된 S3 버킷 트리거
  • 람다 실행 롤 타임 아웃 시간, 메모리 크기 및 네트워크 설정

  • 등은 ALB의 전 기사 와 같습니다 (생략).

    좋은 웹페이지 즐겨찾기