Filebeat 키워드 여러 줄 일치 로그 수집(multiline 및 include_lines)

많은 동료들이 Filebeat 채집 로그를 여러 줄 처리할 수 없다고 생각하는데, 오늘 여기서 filebeat의 멀티라인과 include_lines.
 
먼저 사례, 아래 로그, 우리는 error의 필드만 채집할 것을 요구합니다.
2017/06/22 11:26:30 [error] 26067#0: *17918 connect() failed (111: Connection refused) while connecting to upstream, client: 192.168.32.17, server: localhost, request: "GET /wss/ HTTP/1.1", upstream: "http://192.168.12.106:8010/", host: "192.168.12.106"
2017/06/22 11:26:30 [info] 26067#0:
2017/06/22 12:05:10 [error] 26067#0: *17922 open() "/data/programs/nginx/html/ws" failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: "GET /ws HTTP/1.1", host: "192.168.12.106"

filebeat.yml 파일 구성은 다음과 같습니다.
filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
  include_lines: ['error']
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log

kafka 대기열 보기
역시 "error"키워드의 로그만 채집되었습니다.
{"@timestamp":"2017-06-23T08:57:25.227Z","beat":{"name":"192.168.12.106"},"input_type":"log","message":"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106","offset":30926,"source":"/tmp/test.log","type":"log"}
{"@timestamp":"2017-06-23T08:57:32.228Z","beat":{"name":"192.168.12.106"},"input_type":"log","message":"2017/06/22 12:05:10 [error] 26067#0: *17922 open() /data/programs/nginx/html/ws failed (2: No such file or directory), client: 192.168.32.17, server: localhost, request: GET /ws HTTP/1.1, host: 192.168.12.106","offset":31342,"source":"/tmp/test.log","type":"log"}

여러 줄 사례:
[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}
MapperParsingException[Field name [events.created] cannot contain '.']
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
    at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
    at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)
    at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
    at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
    at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
    at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

filebeat.yml 파일 구성은 다음과 같습니다.
filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
   multiline:
        pattern: '^\['
        negate:  true
        match:   after
  fields:
    beat.name: 192.168.12.106
  fields_under_root: true
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log
kafka :

{"@timestamp":"2017-06-23T09:09:02.887Z","beat":{"name":"192.168.12.106"},"input_type":"log",
"message":"[2016-05-25 12:39:04,744][DEBUG][action.bulk              ] [Set] [***][3] failed to execute bulk item (index) index {[***][***][***], source[{***}}
MapperParsingException[Field name [events.created] cannot contain '.']
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parse(ObjectMapper.java:193)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:305)
     at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:218)
     at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
     at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:118)
     at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:99)
     at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:498)
     at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:257)
     at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:230)
    at org.elasticsearch.cluster.service.InternalClusterService.runTasksForExecutor(InternalClusterService.java:468)
     at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:772)
     at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
     at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)



","offset":35737,"source":"/tmp/test.log","type":"log"}

멀티라인이 여러 줄 로그를 한데 묶는 것을 볼 수 있다.
multiline 및 include_lines, 결합 사용.
filebeat.yml 파일 구성은 다음과 같습니다.
filebeat.prospectors:
- input_type: log
  paths:
    - /tmp/test.log
  include_lines: ['error']
  multiline:
        pattern: '^\['
        negate:  true
        match:   after
output.kafka:
  enabled: true
  hosts: ["192.168.12.105:9092"]
  topic: logstash-errors-log

즉, 로그에 "error"키워드의 로그가 있으면 여러 줄을 합쳐서 kafka로 보냅니다.
경험증에 의하면 로그가 끊임없이 입력되는 경우'error'가 없는 줄도 통합하고 로그가 간격이 있는 상황을 입력하면 필터 효과가 비교적 좋으며 업무 상황과 구체적으로 결합하여 실용적일 수 있다.
한마디로 filebeat는 여러 줄을 합쳐 키워드 로그 수집을 할 수 있다.

좋은 웹페이지 즐겨찾기