트래픽 모니터링 with Kibana, ElasticSearch and Python

Motivation


  • tshark, wireshark보다 트래픽을 쉽게 시각화하고 싶습니다.
  • Kibana의 버전이 새로워졌으므로 사용하고 싶습니다.

  • 대시보드




  • "실시간"으로 패킷의 총량을 시각화
  • 패킷의 Source IP, Destination IP, Source Port, Destination Port를 시각화

  • ElasticSearch Install


    $ brew install elasticsearch
    $ elasticsearch -v
    Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72
    

    Download Kibana


    $ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
    $ tar zxvf kibana-4.0.1-darwin-x64.tar.gz
    
    

    Python Library


    $ pip install pyshark elasticsearch requests
    

    Execute


    $ python packet_cap_es.py <interface>
    
  • See localhost:5601
  • Configure index packet*

  • 파이썬 스크립트


    """
    This app captures packets and extract five tupels.
    Store these data to elastic search.
    Elastic search and kibana creates real time packet monitering
    bashbord.
    """
    import json
    import sys
    import datetime
    import time
    
    import pyshark
    import requests
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    URL = "http://localhost:9200"
    INDEX_URL = URL + "/packets"
    TYPE_URL = INDEX_URL + "/packet"
    ACTION = {"_index" : "packets",
              "_type" : "packet",
              "_source": {}
             }
    
    
    def delete_index():
        """Delete an index in elastic search."""
        requests.delete(INDEX_URL)
    
    
    def create_index():
        """Create an index in elastic search with timestamp enabled."""
        requests.put(INDEX_URL)
        setting = {"packet" : {
                    "_timestamp" : {
                        "enabled" : True,
                        "path" : "capture_timestamp",
                    },
                    "numeric_detection" : False,
                    "properties" : {
                        "dstip" : { "type":"string",
                                    "index" : "not_analyzed",
                                    "store" : True},
                        "srcip" : { "type":"string",
                                    "index" : "not_analyzed",
                                    "store" : True}
                    }
                }}
        for _ in range(1, 100):
            try:
                r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
                break
            except:
                time.sleep(1)
                pass
    
    def main():
        """Extract packets and store them to ES"""
        capture = pyshark.LiveCapture(interface=sys.argv[1])
        packet_que = list()
        es = Elasticsearch()
    
        end_time = None
        for packet in capture.sniff_continuously():
            if packet.transport_layer in ("UDP", "TCP"):
                try:
                    # Why does ES add 9 hours automatically?
                    localtime = float(packet.sniff_timestamp) - 60 * 60 * 9  # GMT + 9
                    row_timestamp = datetime.datetime.fromtimestamp(localtime)
                    timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
                    version = int(packet[1].version)
                    # ip v6 does not have protocol. It has next header instead.
                    if version == 4:
                        protocol = int(packet[1].proto)
                    elif version == 6:
                        protocol = int(packet[1].nxt)
                    else:
                        protocol = None
    
                    dstip = packet[1].dst
                    srcip = packet[1].src
                    dstport = int(packet[2].dstport)
                    srcport = int(packet[2].srcport)
                    parsed_packet = dict(version=version, protocol=protocol,
                                         dstip=dstip, srcip=srcip,
                                         dstport=dstport, srcport=srcport,
                                         capture_timestamp=timestamp)
                    # For historical graph
                    parsed_packet["@timestamp"] = timestamp
                    action = ACTION.copy()
                    action["_source"].update(parsed_packet)
                    packet_que.append(action)
                    current = time.time()
                    while(end_time is None or current - end_time >= 3):
                        helpers.bulk(es, packet_que)
                        del packet_que[0:len(packet_que)]
                        end_time = time.time()
                        break
    
                except Exception as e:
                    time.sleep(1)
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print >>sys.stderr, "python packet_cap_es.py <interface>"
            exit(1)
        delete_index()
        create_index()
        main()
    
    

    좋은 웹페이지 즐겨찾기