Kafka S01/E04로 데이터 흐름 전송 - Grok 표현식으로 로그 파일 분석
카프카 연결 파일 펄스 연결기
앞의 글을 읽었다면 다음 섹션으로 넘어가십시오.
Kafka Connect FilePulse connector은 강력한 소스 코드 커넥터로 데이터를 손쉽게 분석, 변환하고 로컬 파일 시스템에서 Apache Kafka로 데이터를 로드할 수 있습니다.CSV, XML, JSON, LOG4J, AVRO 등 다양한 파일 형식에 대한 내장 지원을 제공합니다.
FilePulse 에 대한 자세한 내용은
커넥터 사용 방법
Kafka-Pulse-Docker는 이미지를 가장 간단하게 연결하는 방법입니다.
$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
GitHub 프로젝트 저장소에서 제공하는 docker-compose.yml
파일을 다운로드하여 Kafka Connect와 FilePulse connector가 미리 설치된 융합 플랫폼을 신속하게 시작할 수 있습니다.$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
모든 Docker 컨테이너를 시작하면 커넥터가 http://localhost:8083
을 통해 액세스할 수 있는 Kafka Connect worker에 설치되어 있는지 확인할 수 있습니다.$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
참고: GitHub Releases Page 또는 Confluent Hub에서도 커넥터를 설치할 수 있습니다.NGINX 시작
먼저 Docker를 사용하여 하나의 HTML 페이지에 서비스를 제공하는 NGINX 인스턴스를 시작합니다.
$ mkdir -p demo/content demo/logs
ìndex.html
이라는 간단한 HTML 페이지도 있습니다.$ cat <<EOF > demo/content/index.html
<!DOCTYPE html>
<html>
<head>
<title>Hi!</title>
</head>
<body>
<h1>Hello World - Kafka Connect FilePulse</h1>
<strong>You can add a Star to this repository to support us! Thank You<a href="https://github.com/streamthoughts/kafka-connect-file-pulse">GitHub</a></strong>
</body>
</html>
EOF
$ docker run --name nginx \
-p 8080:80 \
-v `pwd`/demo/content:/usr/share/nginx/html:ro -d nginx
$ curl -X GET http://localhost:8080
마지막으로, 본고의 나머지 부분을 간소화하기 위해서, 우리는 용기의 stderr와 stdout를 ./demo/logs/nginx.log
파일로 다시 정할 것이다.$ docker logs -f nginx > ./demo/logs/nginx.log 2>&1 &
데이터를 섭취하다
먼저 docker compose에서 시작한 Kafka Connect를 실행하는 컨테이너를 사용하지 않겠습니다.
$ docker stop connect && docker rm connect
그리고nginx에 접근하기 위해 불러온 볼륨이 있는 새 볼륨을 시작합니다.로그 파일.
$ docker stop connect && docker rm connect
$ cat <<EOF > connect-file-pulse-env.list
CONNECT_BOOTSTRAP_SERVERS=localhost:9092
CONNECT_REST_ADVERTISED_HOST_NAME=connect
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://localhost:8081
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT=localhost:2181
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
EOF
$ docker run -it \
--network=host \
--name=connect \
--env-file connect-file-pulse-env.list \
-v `pwd`/demo/logs:/tmp/connect-data \
streamthoughts/kafka-connect-file-pulse:latest
그런 다음 다음 다음 구성을 사용하여 새 커넥터를 생성합니다.$ curl \
-sX PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-01/config \
-d '{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/tmp/connect-data",
"fs.scan.interval.ms": "10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"internal.kafka.reporter.bootstrap.servers": "localhost:9092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "900000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"topic": "connect-file-pulse-nginx-raw-logs",
"tasks.max": 1
}' | jq
마지막으로, 연결기에서 오류가 발생했는지 확인하려면 connect-file-pulse-nginx-raw-logs
이라는 테마를 사용하십시오.로그 파일:$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t connect-file-pulse-nginx-raw-logs \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(산출){
"message": {
"string": "172.17.0.1 - - [05/Jan/2021:10:56:52 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
}
}
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done &
주의: 위의 예에서 우리는 kafkacat을 사용하여 이 주제를 사용했다.옵션 -o-1
최신 메시지만 사용알겠습니다. 방금 사용한 설정을 설명하는 데 시간이 좀 걸립니다.
우선 연결기는 속성
/tmp/connect-data
에 설정된 입력 디렉터리 fs.scan.directory.path
을 정기적으로 스캔하고 패턴 .*\\.log$
과 일치하는 파일을 찾을 것입니다.그리고 각 파일은
offset.strategy
속성의 값에 따라 유일한 표지와 추적을 할 것이다.여기에서 설정 지정 파일은 반드시 name
으로 표시해야 합니다.또는, 예를 들어, 우리는 파일의 inode
을 표지부로 선택할 수 있다.Connect FilePulse는 결합할 수 있는 여러 식별자(예: name+hash
)를 지원합니다.또한 연결기는
RowFileInputReader
을 사용하도록 설정되어 있으며 (참조: task.reader.class
) 줄마다 카프카드 기록을 만들 수 있습니다.RowFileInputReader
의 특징 중 하나는 파일의 끝에 도착한 후에 바로 처리가 끝나지 않지만 시간이 초과될 때까지 기다렸다가 더 많은 바이트를 파일에 쓴다는 것이다.이 동작은 read.max.wait.ms
속성을 통해 구성됩니다.여기서 우리는 15분을 기다린 후에야 문서 처리를 끝낼 수 있다.Grok 표현식을 사용하여 데이터 분석
지금까지 NGINX 로그를 지속적으로 읽을 수 있었습니다.파일에 새 줄을 추가할 때마다 카프카에 message
이라는 텍스트 필드를 포함하는 새 기록을 보냅니다.그러나 각 줄을 분석해 유용한 데이터를 추출하고 구조화된 메시지를 카프카에 생성하는 것이 좋다.
Elastic/ELK Stack, 특히 Logstash 솔루션은 그룹 표현식으로 비구조화된 데이터를 해석하고 의미 있는 필드로 전환하는 것을 보급시켰다.Grok은 정규 표현식 (regex) 위에 위치하고 텍스트 모드를 사용하여 관련 데이터를 일치시킵니다.
Connect FilePulse 는 Grok Expression 의 강력한 기능을 카프카 Connect 에 직접 제공합니다. 엔진 덮개 아래 GrokFilter
라이브러리 Joni regexp 라이브러리의 자바 포트를 사용합니다.그것은 또한 많은 미리 정의되고 다시 사용할 수 있는grok 모드를 제공했다.모드의 전체 목록을 보십시오.
NGINX 액세스 로그 파일의 행과 일치하도록 사용자 정의grok 모드를 정의합니다.
$ cat <<EOF > nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\"
EOF
주의:grok 모드의 문법은% {syntax: SEMANTIC} 또는% {syntax: SEMANTIC: TYPE}입니다.
그런 다음 이전에 생성한 nginx
파일을 컨테이너에 복사하여 커넥터에서 이 모드를 사용할 수 있도록 해야 합니다.
$ docker exec -it connect mkdir -p /tmp/grok-patterns
$ docker cp nginx connect://tmp/grok-patterns/nginx
그런 다음 다음과 같은 구성으로 새 커넥터를 생성할 수 있습니다.
$ curl \
-sX PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-02/config \
-d '{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/tmp/connect-data",
"fs.scan.interval.ms": "10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"internal.kafka.reporter.bootstrap.servers": "localhost:9092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "120000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"topic": "connect-file-pulse-nginx-parsed-logs",
"tasks.max": 1,
"filters": "ParseLog",
"filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
"filters.ParseLog.match": "%{NGINX_ACCESS}",
"filters.ParseLog.overwrite": "message",
"filters.ParseLog.source": "message",
"filters.ParseLog.ignoreFailure": "true",
"filters.ParseLog.patternsDir": "/tmp/grok-patterns"
}' | jq
마지막으로, 출력 테마connect file pulsenginx 해석 로그를 사용하여 추출된 필드를 관찰합니다.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t connect-file-pulse-nginx-parsed-logs \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(산출)
{
"message": {
"string": "172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
},
"remote_addr": {
"string": "172.17.0.1"
},
"remote_user": {
"string": "-"
},
"time_local": {
"string": "05/Jan/2021:13:14:54 +0000"
},
"request": {
"string": "GET / HTTP/1.1"
},
"status": {
"string": "200"
},
"bytes_sent": {
"string": "306"
},
"http_referer": {
"string": "-"
},
"http_user_agent": {
"string": "curl/7.58.0"
}
}
앞서 설명한 대로 다음 명령을 실행하여 추가 액세스 로그를 생성할 수 있습니다.
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done
봐라, 이렇게 간단하다니!
대굴마
Kafka Connect SMT와 Grok 표현식 결합
이전 예에서는 Connect FilePulse가 제공하는
기능을 사용했습니다.
그러나 Kafka Connect는 Apache Kafka 0.10(processing filter chain)에 추가된 단일 메시지 변환(SMT)이라는 메커니즘을 추가했습니다.SMT는 Kafka Connect 파이프라인을 통과하는 각 레코드의 데이터를 수정하는 데 사용할 수 있습니다.
좋은 소식!GrokFilter
을 사용하여 수행한 작업을 KIP-66이라는 전용 SMT로 구체화했습니다.
카프카 연결 그로크 변환
결론
우리는 본문에서 로그 파일을 연속적으로 읽고 해석하는 것이 상당히 쉽다는 것을 보았다.Connect File Pulse connector는 GrokFilter
과 함께 Logstash처럼 Grok 표현식으로 비정형 데이터를 분석할 수 있습니다.
일반적으로 Connect File Pulse connector는 Apache Kafka로 데이터를 보내기 전에 데이터를 손쉽게 조작할 수 있는 강력한 솔루션입니다.이 글을 서슴없이 공유해 주세요.만약 당신이 이 프로젝트를 좋아한다면, 하나를 추가하세요⭐ GitHub 저장소에서 지원합니다.감사합니다.
Reference
이 문제에 관하여(Kafka S01/E04로 데이터 흐름 전송 - Grok 표현식으로 로그 파일 분석), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e04-loading-log-files-using-grok-expression-59i2
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
$ cat <<EOF > nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\"
EOF
$ docker exec -it connect mkdir -p /tmp/grok-patterns
$ docker cp nginx connect://tmp/grok-patterns/nginx
$ curl \
-sX PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-02/config \
-d '{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/tmp/connect-data",
"fs.scan.interval.ms": "10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"internal.kafka.reporter.bootstrap.servers": "localhost:9092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "120000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"topic": "connect-file-pulse-nginx-parsed-logs",
"tasks.max": 1,
"filters": "ParseLog",
"filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
"filters.ParseLog.match": "%{NGINX_ACCESS}",
"filters.ParseLog.overwrite": "message",
"filters.ParseLog.source": "message",
"filters.ParseLog.ignoreFailure": "true",
"filters.ParseLog.patternsDir": "/tmp/grok-patterns"
}' | jq
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t connect-file-pulse-nginx-parsed-logs \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
{
"message": {
"string": "172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
},
"remote_addr": {
"string": "172.17.0.1"
},
"remote_user": {
"string": "-"
},
"time_local": {
"string": "05/Jan/2021:13:14:54 +0000"
},
"request": {
"string": "GET / HTTP/1.1"
},
"status": {
"string": "200"
},
"bytes_sent": {
"string": "306"
},
"http_referer": {
"string": "-"
},
"http_user_agent": {
"string": "curl/7.58.0"
}
}
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done
이전 예에서는 Connect FilePulse가 제공하는 기능을 사용했습니다.
그러나 Kafka Connect는 Apache Kafka 0.10(processing filter chain)에 추가된 단일 메시지 변환(SMT)이라는 메커니즘을 추가했습니다.SMT는 Kafka Connect 파이프라인을 통과하는 각 레코드의 데이터를 수정하는 데 사용할 수 있습니다.
좋은 소식!
GrokFilter
을 사용하여 수행한 작업을 KIP-66이라는 전용 SMT로 구체화했습니다.카프카 연결 그로크 변환
결론
우리는 본문에서 로그 파일을 연속적으로 읽고 해석하는 것이 상당히 쉽다는 것을 보았다.Connect File Pulse connector는 GrokFilter
과 함께 Logstash처럼 Grok 표현식으로 비정형 데이터를 분석할 수 있습니다.
일반적으로 Connect File Pulse connector는 Apache Kafka로 데이터를 보내기 전에 데이터를 손쉽게 조작할 수 있는 강력한 솔루션입니다.이 글을 서슴없이 공유해 주세요.만약 당신이 이 프로젝트를 좋아한다면, 하나를 추가하세요⭐ GitHub 저장소에서 지원합니다.감사합니다.
Reference
이 문제에 관하여(Kafka S01/E04로 데이터 흐름 전송 - Grok 표현식으로 로그 파일 분석), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e04-loading-log-files-using-grok-expression-59i2
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Reference
이 문제에 관하여(Kafka S01/E04로 데이터 흐름 전송 - Grok 표현식으로 로그 파일 분석), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e04-loading-log-files-using-grok-expression-59i2텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)