로그 수집 rsyslog to kafka
14190 단어 __Linux -- 명령
글자 수
읽다
평론
좋아 하 다
프로젝트 는 로 그 를 수집 하여 저장 분석 을 해 야 합 니 다. 데이터 의 흐름 은 rsyslog (수집) - > kafka (메시지 큐) - > logstash (청소) - > es, hdfs 입 니 다.오늘 우 리 는 먼저 rsyslog 를 어떻게 이용 하여 로 그 를 kafka 에 수집 할 것 입 니까?
환경 준비
통과 rsyslog 공식 문서 보기, rsyslog 가 kafka 에 대한 지원 이 v 8.7.0 버 전 이라는 것 을 알 고 지원 합 니 다. ChangeLog V8. X 버 전의 변 화 를 알 수 있 습 니 다. 최신 V8 안정 판 은 RPM 패 키 지 를 제공 하 는 Rsyslog - kafka 플러그 인 입 니 다. 직접 yum 을 설치 하면 됩 니 다. yum 소스 를 추가 합 니 다.
[rsyslog_v8]
name=Adiscon CentOS-$releasever - local packages for $basearch
baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch
enabled=1
gpgcheck=0
gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon
protect=1
추가 후
yum install rsyslog rsyslog-kafka.x86_64
설 치 를 완료 할 수 있 습 니 다.배치
1. 처리 원칙
input submit received messages to rulesets, zero or many
ruleset contains rule, rule consist of a filter and an action list
actions consist of the action call itself (e.g. ”:omusrmsg:”) as well as all action-defining configuration statements ($Action... directives)
2. Statement Types 표현 식 형식
보통 RainerScript type statements 를 이용 하여 매우 간단명료 한 설정 성명 을 합 니 다. 예 를 들 어:
mail.info /var/log/mail.log
3. 프로 세 스 제어
Control structures
필터 조건 1) Selector: 전통 적 인 방식, 형식 은 다음 과 같 습 니 다.
[,facility...][,*].[=,!][,priority...][,*];[,facility...][,*].[=|!][,priority...][,*]...
그 중에서 기본 facility 는 auth, authpriv, cron, daemon, kern, lpr, mail, mark, news, security (same as auth), syslog, user, uucp and local 0 through local 7 입 니 다.기본 priority 는 debug, info, notice, warning, warn (same as warning), err, err (same as err), crit, alert, emerg, panic (same as emerg) 입 니 다.2) Property - based filers: new filter type. 형식 은 다음 과 같 습 니 다. :property, [!]compare-operation, "value"
각각 이름, 비교 문자, 대비 해 야 할 필드 입 니 다.비교 부 호 는 contains, isequal, startswith, regex, ereregex 3) 를 포함한다. Expression based filters: if expr then action-part-of-selector-line
4) BSD - style blocks: 5) 예: if $syslogfacility-text == 'local0' and $msg startswith 'DEVNAME' and not ($msg contains 'error1' or $msg contains 'error0') then /var/log/somelog
4. 데이터 처리: set, unset, reset 작업 지원
비고: 메시지 json (CEE / Lumberjack) 속성 만 변경 할 수 있 습 니 다. set, unset andreset statements
5. input
여러 종류의 input 모듈 이 있 습 니 다. 저 희 는 imfile 모듈 을 예 로 들 어 이 모듈 은 모든 텍스트 파일 내용 을 syslog 로 한 줄 씩 이동 합 니 다.
input(type="imfile" tag="kafka" file="analyze.log" ruleset="imfile-kafka"[, Facility=local.7])
6. outputs
actions 라 고도 부 릅 니 다. 동작 을 처리 하 는 형식 은 다음 과 같 습 니 다.
action (
type="omkafka"
topic="kafka_test"
broker="10.120.169.149:9092"
)
7. Rulesets and Rules
Rulesets 는 여러 개의 rule 을 포함 합 니 다. 하나의 규칙 은 rsyslog 가 메 시 지 를 처리 하 는 방식 입 니 다. 모든 규칙 은 filter 와 actions 를 포함 합 니 다.
input(type="imfile" tag="kafka" file="analyze.log" ruleset="rulesetname")
ruleset(name="rulesetname") {
action(type="omfile" file="/path/to/file")
action(type="..." ...)
/* and so on... */
}
input 에 있 는 ruleset 설정 을 통 해 입력 흐름 을 ruleset 에 들 어가 규칙 적 으로 일치 시 킨 다음 action 작업 을 수행 하여 대류 처 리 를 완성 합 니 다.
8. Queue parameters
서로 다른 입력 흐름 을 다른 대기 열 에 들 어가 데 이 터 를 병렬 처리 합 니 다. 보통 ruleset 이나 action 에서 설정 합 니 다. 기본 값 은 하나의 대기 열 만 있 습 니 다.설정 매개 변수 예
action(type="omfwd" target="192.168.2.11" port="10514" protocol="tcp"
queue.filename="forwarding" queue.size="1000000" queue.type="LinkedList"
)
9. templates
이것 은 rsyslog 의 중요 한 특성 입 니 다. 사용자 가 스 트림 형식 을 사용자 정의 할 수 있 고 로그 파일 을 동적 으로 생 성 할 수 있 습 니 다. 기본 값 은 원본 형식 입 니 다.일반 표현 식 은 다음 과 같 습 니 다.
template(parameters) { list-descriptions }
list: 목록 템 플 릿, name, type = "list" 를 포함 하고 여러 constant 와 property 가 맞습니다.template(name="tpl1" type="list") {
constant(value="Syslog MSG is: '")
property(name="msg")
constant(value="', ")
property(name="timereported" dateFormat="rfc3339" caseConversion="lower")
constant(value="
")
}
string: 문자열 사용자 정의 형식 모듈, name, type = "string", string = ", 예 를 들 어
%TIMESTAMP:::date-rfc3339% %HOSTNAME%%syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%"
모든 로그 필드 를 사용자 정의 변수 와 처리 방식 (property replace) 을 통 해 전역 적 으로 읽 을 수 있 는 로그 변 수 를 얻 습 니 다.
주의:
원본 형식: v6 이전 형식,
$template strtpl,"PRI: %pri%, MSG: %msg%
"
. action 의 template 인 자 를 이용 하여 templates 와 action 을 연결 합 니 다. 예 를 들 어 action(template=TEMPLATENAME,type="omfile" file="/var/log/all-msgs.log")
실례nginx access 로 그 를 rsyslog 를 통 해 kafka 로 전송 하 는 인 스 턴 스 를 추가 하고 nginx kafka. conf 를 / etc / rsyslog. d 디 렉 터 리 에 넣 고 rsyslog 를 다시 시작 하면 됩 니 다.
# omkafka imfile
module(load="omkafka")
module(load="imfile")
# nginx template
template(name="nginxAccessTemplate" type="string" string="%hostname%%syslogtag%%msg%
")
# ruleset
ruleset(name="nginx-kafka") {
# kafka
action (
type="omkafka"
template="nginxAccessTemplate"
confParam=["compression.codec=snappy", "queue.buffering.max.messages=400000"]
partitions.number="4"
topic="test_nginx"
broker="10.120.169.149:9092"
queue.spoolDirectory="/tmp"
queue.filename="test_nginx_kafka"
queue.size="360000"
queue.maxdiskspace="2G"
queue.highwatermark="216000"
queue.discardmark="350000"
queue.type="LinkedList"
queue.dequeuebatchsize="4096"
queue.timeoutenqueue="0"
queue.maxfilesize="10M"
queue.saveonshutdown="on"
queue.workerThreads="4"
)
}
# action
input(type="imfile" Tag="nginx,aws" File="/var/log/access.log" Ruleset="nginx-kafka")
conf 파일 이 rsyslogd debug 모드
rsyslogd -dn
를 실행 할 수 있 는 지 확인 하고 로그 출력 결 과 를 보 거나 직접 실행 rsyslogd -N 1
하여 conf 파일 이 올 바른 지 확인 합 니 다.