Python 로그 분석

프로젝트 현황 소개: Python 3.6.6 을 바탕 으로 nginx 에 접근 하 는 로그 분석 코드 를 실현 하고 로그 의 코드 비례 통계 와 브 라 우 저 유형 과 방문 상황 통 계 를 실현 하 는 코드 세그먼트 는 다음 과 같 습 니 다. 1. 창문 함 수 를 작성 하여 일정한 시간 내 에 데 이 터 를 분석 합 니 다. 2. 정규 표현 식 을 통 해 로 그 를 일치 시 키 고 로그 파일 을 불 러 옵 니 다.텍스트 의 줄 마다 로그 정 보 를 추출 합 니 다. 3. 소비 단 코드 를 작성 합 니 다. 추출 한 데 이 터 는 소비 단 코드 에 따라 처리 할 수 있 습 니 다. 4. 메시지 배포 코드 를 통 해 이 루어 집 니 다. quue 를 통 해 추출 한 텍스트 를 대기 열 에 넣 고 소비 단 코드 로 항목 코드 를 처리 할 수 있 습 니 다.
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
from user_agents import parse
"""
    ,             ,     handler(      )
              ,         ,     code          
    ,     ,data = src.get(),           ,      ,         ,      
"""
pattern = '''(?P[\d.]{7,}\s-\s-\s\[(?P[^\[\]]+)\])\s\
"(?P.*)\s(?P.*)\s(?P.*)"\s(?P\d{3})\s(?P\d+)\s"[^"]+"\s"(?P[^"]+)"'''

#  
regex = re.compile(pattern)

#    
ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda ua: parse(ua)
}

#    
def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}

#     
def openfile(path: str):
    """      """
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue

#    ,            
def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))

#     100     
def source(second=1):
    """    """
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
            'value': random.randint(1, 100)
        }
        time.sleep(second)

#       
def window(src: Queue, handler, width: int, interval: int):
    '''
        ,                   
    :param src:   ,       ,      
    :param handler:       
    :param width:      , 
    :param interval:      , 
    '''
    start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        #         
        data = src.get()  #        ,      ,         
        if data:
            buffer.append(data)
            current = data['datetime']  #           

        #   interval    buffer      
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            start = current
            #     width   
            buffer = [x for x in buffer if x['datetime'] > current - delta]

#           
source()
def handler(iterable):
    #return sum(map(lambda x: x['value'], iterable)) / len(iterable)
    print(sum(map(lambda x:x['value'],iterable))/len(iterable))

#     
def donothing_handler(iterable):
    #return iterable
    print(iterable)

#      
def status_handler(iterable):
    #           
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key, 0) + 1
    total = len(iterable)
    print({k:float( "{:.2f}".format(status[k] / total)) for k, v in status.items()})
    return {k: status[k] / total for k, v in status.items()}

#      
allbrowsers = {}

def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']
        key = (ua.browser.family, ua.browser.version_string)
        browsers[key] = browsers.get(key, 0) + 1
        allbrowsers[key] = allbrowsers.get(key, 0) + 1

    print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
    return browsers

#    
def dispatcher(src):
    #       handler,         
    handlers = []
    queues = []

    def reg(handler, width: int, interval: int):
        """
                
        :param handler:        
        :param width:      
        :param interval:    
        """
        q = Queue()
        queues.append(q)
        #    ,    
        h = threading.Thread(target=window, args=(q, handler, width, interval))
        handlers.append(h)

    def run():
        #         
        for t in handlers:
            t.start()

        #                 
        for item in src:
            for q in queues:
                q.put(item)
                # print(q.get())

    return reg, run

if __name__ == "__main__":
    import sys

    path = '/tmp/test.log'
    """
              ,      5s  10s          
    reg, run = dispatcher(source())
    reg(handler, 10, 5)
    """

    reg, run = dispatcher(load(path))

    #  5s    10s   ,      
    reg(donothing_handler, 10, 5)
    #  5s  10s            
    reg(status_handler, 10, 5)
    #   5s  10s           ,    10s          
    reg(browser_handler,10,5)
    run()

좋은 웹페이지 즐겨찾기