Kafka S01/E04로 데이터 흐름 전송 - Grok 표현식으로 로그 파일 분석

카프카 데이터 흐름 시리즈의 네 번째이자 마지막 문장이다.앞의 세 편의 글에서, 우리는 CSV, XML, JSON 파일의 기록을 Apache Kafka에 불러오는 방법을 보았는데, 코드를 한 줄 작성하지 않아도 된다.이를 위해 우리는 Kafka Connect FilePulse connector을 사용했는데 많은 우수한 기능을 포함하여 데이터를 해석하고 변환했다.



  • 이전 글에서 NGINX 웹 서버의 비구조화된 로그 파일을 구조화된 데이터 필드로 해석하는 방법을 보았습니다.

    카프카 연결 파일 펄스 연결기


    앞의 글을 읽었다면 다음 섹션으로 넘어가십시오.

    Kafka Connect FilePulse connector은 강력한 소스 코드 커넥터로 데이터를 손쉽게 분석, 변환하고 로컬 파일 시스템에서 Apache Kafka로 데이터를 로드할 수 있습니다.CSV, XML, JSON, LOG4J, AVRO 등 다양한 파일 형식에 대한 내장 지원을 제공합니다.
    FilePulse 에 대한 자세한 내용은
  • Kafka Connect FilePulse - One Connector to Ingest them All!
  • 자세한 내용은 documentation here을 참조하십시오.

    커넥터 사용 방법


    Kafka-Pulse-Docker는 이미지를 가장 간단하게 연결하는 방법입니다.
    $ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
    
    GitHub 프로젝트 저장소에서 제공하는 docker-compose.yml 파일을 다운로드하여 Kafka Connect와 FilePulse connector가 미리 설치된 융합 플랫폼을 신속하게 시작할 수 있습니다.
    $ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
    $ docker-compose up -d
    
    모든 Docker 컨테이너를 시작하면 커넥터가 http://localhost:8083을 통해 액세스할 수 있는 Kafka Connect worker에 설치되어 있는지 확인할 수 있습니다.
    $ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
    
    "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
    
    참고: GitHub Releases Page 또는 Confluent Hub에서도 커넥터를 설치할 수 있습니다.

    NGINX 시작


    먼저 Docker를 사용하여 하나의 HTML 페이지에 서비스를 제공하는 NGINX 인스턴스를 시작합니다.
  • 이 프레젠테이션을 위해 디렉터리를 만듭니다.
  • $ mkdir -p demo/content demo/logs
    
  • ...ìndex.html이라는 간단한 HTML 페이지도 있습니다.
  • $ cat <<EOF > demo/content/index.html
    <!DOCTYPE html>
    <html>
      <head>
        <title>Hi!</title>
      </head>
      <body>
        <h1>Hello World - Kafka Connect FilePulse</h1>
        <strong>You can add a Star to this repository to support us! Thank You<a href="https://github.com/streamthoughts/kafka-connect-file-pulse">GitHub</a></strong>
      </body>
    </html>
    EOF
    
  • 에서 다음 명령을 실행하여 NGINX 웹 서버를 시작합니다.
  • $ docker run --name nginx \
    -p 8080:80 \
    -v `pwd`/demo/content:/usr/share/nginx/html:ro -d nginx
    
  • 은 서버가 올바르게 작동하는지 확인합니다.
  • $ curl -X GET http://localhost:8080
    
    마지막으로, 본고의 나머지 부분을 간소화하기 위해서, 우리는 용기의 stderr와 stdout를 ./demo/logs/nginx.log 파일로 다시 정할 것이다.
    $ docker logs -f nginx > ./demo/logs/nginx.log 2>&1 & 
    

    데이터를 섭취하다


    먼저 docker compose에서 시작한 Kafka Connect를 실행하는 컨테이너를 사용하지 않겠습니다.
    $ docker stop connect && docker rm connect
    
    그리고nginx에 접근하기 위해 불러온 볼륨이 있는 새 볼륨을 시작합니다.로그 파일.
  • 에서 Kafka Connect 컨테이너에 설정해야 하는 환경 변수를 정의하는 파일을 만듭니다.
  • $ cat <<EOF > connect-file-pulse-env.list
    CONNECT_BOOTSTRAP_SERVERS=localhost:9092
    CONNECT_REST_ADVERTISED_HOST_NAME=connect
    CONNECT_REST_PORT=8083
    CONNECT_GROUP_ID=compose-connect-group
    CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs
    CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
    CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
    CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets
    CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
    CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status
    CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
    CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
    CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://localhost:8081
    CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
    CONNECT_ZOOKEEPER_CONNECT=localhost:2181
    CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
    EOF
    
  • Kafka Connect FilePulse 커넥터를 실행하는 컨테이너를 시작합니다.
  • $ docker run -it \
     --network=host \
     --name=connect \
     --env-file connect-file-pulse-env.list \
     -v `pwd`/demo/logs:/tmp/connect-data \
     streamthoughts/kafka-connect-file-pulse:latest
    
    그런 다음 다음 다음 구성을 사용하여 새 커넥터를 생성합니다.
    $ curl \
        -sX PUT -H "Accept:application/json" \
        -H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-01/config \
        -d '{
         "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
         "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
         "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
         "fs.scan.directory.path": "/tmp/connect-data",
         "fs.scan.interval.ms": "10000",
         "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
         "file.filter.regex.pattern":".*\\.log$",
         "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
         "internal.kafka.reporter.topic": "connect-file-pulse-status",
         "offset.strategy": "name",
         "read.max.wait.ms": "900000",
         "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
         "topic": "connect-file-pulse-nginx-raw-logs",
         "tasks.max": 1
    }' | jq
    
    마지막으로, 연결기에서 오류가 발생했는지 확인하려면 connect-file-pulse-nginx-raw-logs이라는 테마를 사용하십시오.로그 파일:
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t connect-file-pulse-nginx-raw-logs \
            -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (산출)
    {
      "message": {
        "string": "172.17.0.1 - - [05/Jan/2021:10:56:52 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
      }
    }
    
  • 마지막으로, 다음 명령을 실행하여 추가 액세스 로그를 생성할 수 있습니다.
  • $ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done &
    
    주의: 위의 예에서 우리는 kafkacat을 사용하여 이 주제를 사용했다.옵션 -o-1최신 메시지만 사용
    알겠습니다. 방금 사용한 설정을 설명하는 데 시간이 좀 걸립니다.
    우선 연결기는 속성 /tmp/connect-data에 설정된 입력 디렉터리 fs.scan.directory.path을 정기적으로 스캔하고 패턴 .*\\.log$과 일치하는 파일을 찾을 것입니다.
    그리고 각 파일은 offset.strategy 속성의 값에 따라 유일한 표지와 추적을 할 것이다.여기에서 설정 지정 파일은 반드시 name으로 표시해야 합니다.또는, 예를 들어, 우리는 파일의 inode을 표지부로 선택할 수 있다.Connect FilePulse는 결합할 수 있는 여러 식별자(예: name+hash)를 지원합니다.
    또한 연결기는 RowFileInputReader 을 사용하도록 설정되어 있으며 (참조: task.reader.class) 줄마다 카프카드 기록을 만들 수 있습니다.
    RowFileInputReader 의 특징 중 하나는 파일의 끝에 도착한 후에 바로 처리가 끝나지 않지만 시간이 초과될 때까지 기다렸다가 더 많은 바이트를 파일에 쓴다는 것이다.이 동작은 read.max.wait.ms 속성을 통해 구성됩니다.여기서 우리는 15분을 기다린 후에야 문서 처리를 끝낼 수 있다.

    Grok 표현식을 사용하여 데이터 분석


    지금까지 NGINX 로그를 지속적으로 읽을 수 있었습니다.파일에 새 줄을 추가할 때마다 카프카에 message이라는 텍스트 필드를 포함하는 새 기록을 보냅니다.그러나 각 줄을 분석해 유용한 데이터를 추출하고 구조화된 메시지를 카프카에 생성하는 것이 좋다.
    Elastic/ELK Stack, 특히 Logstash 솔루션은 그룹 표현식으로 비구조화된 데이터를 해석하고 의미 있는 필드로 전환하는 것을 보급시켰다.Grok은 정규 표현식 (regex) 위에 위치하고 텍스트 모드를 사용하여 관련 데이터를 일치시킵니다.
    Connect FilePulse 는 Grok Expression 의 강력한 기능을 카프카 Connect 에 직접 제공합니다. 엔진 덮개 아래 GrokFilter 라이브러리 Joni regexp 라이브러리의 자바 포트를 사용합니다.그것은 또한 많은 미리 정의되고 다시 사용할 수 있는grok 모드를 제공했다.모드의 전체 목록을 보십시오.
    NGINX 액세스 로그 파일의 행과 일치하도록 사용자 정의grok 모드를 정의합니다.
    $ cat <<EOF > nginx
    NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\"
    EOF
    
    주의:grok 모드의 문법은% {syntax: SEMANTIC} 또는% {syntax: SEMANTIC: TYPE}입니다.
    그런 다음 이전에 생성한 nginx 파일을 컨테이너에 복사하여 커넥터에서 이 모드를 사용할 수 있도록 해야 합니다.
    $ docker exec -it connect mkdir -p /tmp/grok-patterns
    $ docker cp nginx connect://tmp/grok-patterns/nginx
    
    그런 다음 다음과 같은 구성으로 새 커넥터를 생성할 수 있습니다.
    $ curl \
        -sX PUT -H "Accept:application/json" \
        -H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-02/config \
        -d '{
         "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
         "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
         "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
         "fs.scan.directory.path": "/tmp/connect-data",
         "fs.scan.interval.ms": "10000",
         "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
         "file.filter.regex.pattern":".*\\.log$",
         "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
         "internal.kafka.reporter.topic": "connect-file-pulse-status",
         "offset.strategy": "name",
         "read.max.wait.ms": "120000",
         "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
         "topic": "connect-file-pulse-nginx-parsed-logs",
         "tasks.max": 1,
         "filters": "ParseLog",
         "filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
         "filters.ParseLog.match": "%{NGINX_ACCESS}",
         "filters.ParseLog.overwrite": "message",
         "filters.ParseLog.source": "message",
         "filters.ParseLog.ignoreFailure": "true",
         "filters.ParseLog.patternsDir": "/tmp/grok-patterns"
    }' | jq
    
    마지막으로, 출력 테마connect file pulsenginx 해석 로그를 사용하여 추출된 필드를 관찰합니다.
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t connect-file-pulse-nginx-parsed-logs \
            -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (산출)
    {
      "message": {
        "string": "172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
      },
      "remote_addr": {
        "string": "172.17.0.1"
      },
      "remote_user": {
        "string": "-"
      },
      "time_local": {
        "string": "05/Jan/2021:13:14:54 +0000"
      },
      "request": {
        "string": "GET / HTTP/1.1"
      },
      "status": {
        "string": "200"
      },
      "bytes_sent": {
        "string": "306"
      },
      "http_referer": {
        "string": "-"
      },
      "http_user_agent": {
        "string": "curl/7.58.0"
      }
    }
    
    앞서 설명한 대로 다음 명령을 실행하여 추가 액세스 로그를 생성할 수 있습니다.
    $ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done
    
    봐라, 이렇게 간단하다니!

    대굴마 Kafka Connect SMT와 Grok 표현식 결합


    이전 예에서는 Connect FilePulse가 제공하는 기능을 사용했습니다.
    그러나 Kafka Connect는 Apache Kafka 0.10(processing filter chain)에 추가된 단일 메시지 변환(SMT)이라는 메커니즘을 추가했습니다.SMT는 Kafka Connect 파이프라인을 통과하는 각 레코드의 데이터를 수정하는 데 사용할 수 있습니다.
    좋은 소식!GrokFilter을 사용하여 수행한 작업을 KIP-66이라는 전용 SMT로 구체화했습니다.

    카프카 연결 그로크 변환 결론


    우리는 본문에서 로그 파일을 연속적으로 읽고 해석하는 것이 상당히 쉽다는 것을 보았다.Connect File Pulse connector는 GrokFilter과 함께 Logstash처럼 Grok 표현식으로 비정형 데이터를 분석할 수 있습니다.
    일반적으로 Connect File Pulse connector는 Apache Kafka로 데이터를 보내기 전에 데이터를 손쉽게 조작할 수 있는 강력한 솔루션입니다.이 글을 서슴없이 공유해 주세요.만약 당신이 이 프로젝트를 좋아한다면, 하나를 추가하세요⭐ GitHub 저장소에서 지원합니다.감사합니다.

    좋은 웹페이지 즐겨찾기