Logstash Input S3 플러그인으로 CloudFront 로그 가져오기

업데이트 : 7.7.0에서 작동

소개


  • CloudFront의 로그를 좋은 느낌 (모든 필드를 깨끗이 Visualize)에 Elasticsearch로 시각화하고 싶다
  • CloudFront의 c_ip or x_forwarded_for를 Geo IP로 위도/경도를 얻고 싶습니다. 국가 이름, 도시 이름 등을 분석하기 위해 로그에 저장하고 싶습니다

  • Elastic Cloud에 로그를 캡처하고 싶습니다

  • Amazon S3에 저장된 로그 데이터를 캡처하고 싶습니다

  • 전제


  • Logstash가 인스톨 되고 있는 EC2/ECS등이 벌써 움직이고 있다
  • 섭취를 견딜 수 있는 사양

  • S3에 CloudFront 로그가 저장됩니다
  • 로그를 캡처하는 Elasticsearch가 있습니다
  • Logstash, Elasticsearch를 이미 사용하고 있으며 설정을 어느 정도 알 수 있습니다

  • 버전


  • Logstash-7.4.2
  • Elasticsearch-7.4.2
  • Kibana-7.4.2
  • update::7.7.0에서도 작동 문제 없음

  • 쓰지 않은 것


  • Logstash 설정 (pipeline.yml/logstash.yml 등)
  • Elasticserach 설정
  • CloudFront에서 S3으로 로그를 내보내는 방법

  • 동작 구성





    설정



    CloudFront Log 데이터


    date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
    

    Grok pattern %{CF_ACCESS_LOG}로 설정


    CF_ACCESS_LOG %{DATE_EU:date}\t%{TIME:time}\t(?<x_edge_location>\b[\w\-]+\b)\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IP:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User_Agent}\t%{GREEDYDATA:cs_uri_stem}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{GREEDYDATA:time_taken}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}\t%{GREEDYDATA:cs_protocal_version}\t%{GREEDYDATA:file_status}\t%{GREEDYDATA:file_encrypted_fields}\t%{GREEDYDATA:c_port}\t%{GREEDYDATA:time_to_first-byte}\t%{GREEDYDATA:x_edge_detailed_result_type}\t%{GREEDYDATA:sc_content_type}\t%{GREEDYDATA:sc_content_len}\t%{GREEDYDATA:sc_range_start}\t%{GREEDYDATA:sc_range_end}
    

    Grok Debugger를 사용하면 좋은 느낌



    Kibana → Dev Tools → Grok Debugger를 엽니다. 실제 로그 데이터를 Sample Data로 설정하고 Grok Pattern을 확인합니다. 틀리지 않으면 Structured Data로서 결과를 확인할 수 있다. 예기치 않은 형태로 Elasticsearch에 도입되지 않아도 된다.


    cloudfront.conf



    Logstash의 Pipeline에서 cloudfront.conf를 로드할 수 있도록 합니다.

    입력 설정


    input {
      s3 {
        access_key_id => "XXXXXXXXXXXXXXX"
        secret_access_key => "XXXXXXXXXXXXXXXXXXXXXXXXXX"
        bucket => "s3-bucket-name"
        region => "ap-northeast-1"
        interval => "60"
        watch_for_new_files => true
      }
    }
    

    filter 설정



    Grok pattern을 매개 변수로 로드할 수 있는 설정
    patterns_dir => ["/etc/logstash/patterns"]
    

    grok을 한 후에는 각종 플러그인을 이용하여 로그를 가공한다. x_forwarded_for가 없는 경우는 c_ip로부터 geo_location을 취득해, 어떤 경우는 x_forwarded_for를 이용하는 설정. 마지막으로 중복되는 데이터를 삭제합니다. useragent는 User Agent를 좋은 느낌으로 해주는 플러그인.
    filter {
      grok {
            patterns_dir => ["/etc/logstash/patterns"]
            match => [ "message", "%{CF_ACCESS_LOG}" ]
         }
      mutate {
        add_field => [ "listener_timestamp", "%{date} %{time}" ]
      }
    
      date {
        match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
        timezone => "UTC"
        target => "@timestamp"
      }
    
      if [x_forwarded_for] == "-" {
        geoip {
          source => "c_ip"
        }
      } else {
        geoip {
          source => "x_forwarded_for"
        }
      }
    
      useragent {
        source => "User_Agent"
        target => "useragent"
      }
    
      mutate {
        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
      }
    
      mutate {
        convert => [ "[geoip][coordinates]", "float" ]
      }
    
      mutate {
        remove_field => ["date", "time", "listener_timestamp", "cloudfront_version", "message", "cloudfront_fields", "User_Agent"]
      }
    }
    

    출력 설정



    Elastic Cloud로 전송할 설정. user/passwod,hosts는 적절히 수정. index명은 일자 단위로 설정
    output {
           elasticsearch {
               user  => xxxxxxx
               password => xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
               hosts => "https://xxxxxxxxxxxxxxxxxxxxxxxxxxxx:12345"
               index => "cloudfront-%{+yyyy.MM.dd}"
               }
    }
    

    sincedb를 수정하여 캡처하고 싶은 곳에서 가져올 수 있습니다.



    S3를 잠시 이용한 상태에서 아무것도 생각하지 않고 실행하면 과거의 로그가 대량으로 캡처되어 버리는 경우도 있다. 캡처하려는 타이밍은 sincedb를 수정하여 변경할 수 있습니다. UTC로 걸리므로 주의.
    # vi /${path}/plugins/inputs/s3/sincedb_8171845adf3152f179a1c22e493a62c4"}
    # 2019-12-12 01:22:10 +0000
    

    가져온



    가져온 로그를 Visualize하면. 좋은 느낌으로 분석을 할 수 있을 것 같다.



    주의점



    CloudFront의 로그 포맷이 바뀌면 Grok이 실패해, tag에 _grokparsefailure가 세트 되어 로그를 받아들일 수 없게 된다. 그렇게 빈번하지는 않지만, 로그를 감시(Watcher)를 넣어 tag의 값을 경고해 두면 편리.

    참고



  • htps //w w. 에스 c. 코/구이데/엔/ぉgs한 sh/쿤 t/인프 tp㎅긴 s. HTML
  • htps //w w. 에스 c. 코/구이데/엔/ぉgs한 sh/7. HTML

  • htps : // / cs. 아 ws. 아마존. 이 m / 그럼 _ jp / 아마 존 C ぉ dF 롱 t / ㄴ st / ゔ ぇ ぺ ぺ ぐ い / 어쩌 ss gs. HTML -
  • 좋은 웹페이지 즐겨찾기