트래픽 모니터링 with Kibana, ElasticSearch and Python
16123 단어 파이썬trafficElasticsearch키바나
Motivation
대시보드
ElasticSearch Install
$ brew install elasticsearch
$ elasticsearch -v
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72
Download Kibana
$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz
Python Library
$ pip install pyshark elasticsearch requests
Execute
$ python packet_cap_es.py <interface>
파이썬 스크립트
"""
This app captures packets and extract five tupels.
Store these data to elastic search.
Elastic search and kibana creates real time packet monitering
bashbord.
"""
import json
import sys
import datetime
import time
import pyshark
import requests
from elasticsearch import Elasticsearch
from elasticsearch import helpers
URL = "http://localhost:9200"
INDEX_URL = URL + "/packets"
TYPE_URL = INDEX_URL + "/packet"
ACTION = {"_index" : "packets",
"_type" : "packet",
"_source": {}
}
def delete_index():
"""Delete an index in elastic search."""
requests.delete(INDEX_URL)
def create_index():
"""Create an index in elastic search with timestamp enabled."""
requests.put(INDEX_URL)
setting = {"packet" : {
"_timestamp" : {
"enabled" : True,
"path" : "capture_timestamp",
},
"numeric_detection" : False,
"properties" : {
"dstip" : { "type":"string",
"index" : "not_analyzed",
"store" : True},
"srcip" : { "type":"string",
"index" : "not_analyzed",
"store" : True}
}
}}
for _ in range(1, 100):
try:
r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
break
except:
time.sleep(1)
pass
def main():
"""Extract packets and store them to ES"""
capture = pyshark.LiveCapture(interface=sys.argv[1])
packet_que = list()
es = Elasticsearch()
end_time = None
for packet in capture.sniff_continuously():
if packet.transport_layer in ("UDP", "TCP"):
try:
# Why does ES add 9 hours automatically?
localtime = float(packet.sniff_timestamp) - 60 * 60 * 9 # GMT + 9
row_timestamp = datetime.datetime.fromtimestamp(localtime)
timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
version = int(packet[1].version)
# ip v6 does not have protocol. It has next header instead.
if version == 4:
protocol = int(packet[1].proto)
elif version == 6:
protocol = int(packet[1].nxt)
else:
protocol = None
dstip = packet[1].dst
srcip = packet[1].src
dstport = int(packet[2].dstport)
srcport = int(packet[2].srcport)
parsed_packet = dict(version=version, protocol=protocol,
dstip=dstip, srcip=srcip,
dstport=dstport, srcport=srcport,
capture_timestamp=timestamp)
# For historical graph
parsed_packet["@timestamp"] = timestamp
action = ACTION.copy()
action["_source"].update(parsed_packet)
packet_que.append(action)
current = time.time()
while(end_time is None or current - end_time >= 3):
helpers.bulk(es, packet_que)
del packet_que[0:len(packet_que)]
end_time = time.time()
break
except Exception as e:
time.sleep(1)
if __name__ == "__main__":
if len(sys.argv) != 2:
print >>sys.stderr, "python packet_cap_es.py <interface>"
exit(1)
delete_index()
create_index()
main()
Reference
이 문제에 관하여(트래픽 모니터링 with Kibana, ElasticSearch and Python), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/giwa/items/a24c722d8bc1b9a428cb텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)