Elasticsearch에서 Kafka를 만났어요.

텐센트 클라우드 + 커뮤니티에 오신 것을 환영합니다. 더 많은 텐센트 대량의 기술 실천 건조물을 얻을 수 있습니다.
본고는 Michelmu가 클라우드 + 커뮤니티 칼럼에 발표했다
Elasticsearch는 현재 주류의 전문 검색 엔진으로서 강력한 전문 검색 능력과 높은 확장성을 제외하고 다양한 데이터 원본에 대한 호환 능력도 그의 성공 비결 중의 하나이다.한편, Elasticsearch의 강력한 데이터 원본 호환 능력은 핵심 구성 요소 중 하나인 Logstash에서 비롯되었다. Logstash는 플러그인 형식을 통해 다양한 데이터 원본에 대한 입력과 출력을 실현했다.Kafka는 높은 흡수량의 분포식 구독 메시지 시스템으로 흔히 볼 수 있는 데이터 원본이자 Logstash가 지원하는 수많은 입력과 출력 원본 중 하나이다.본고는 실천적인 측면에서 Logstash Kafka Input 플러그인을 사용하여 Kafka의 데이터를 Elasticsearch로 가져오는 과정을 연구하고자 한다.
Logstash Kafka 플러그인을 사용하여 Kafka 및 Elasticsearch 연결

1 Logstash Kafka input 플러그인 소개


Logstash Kafka Input 플러그인은 Kafka API를 사용하여 Kafka topic에서 데이터 정보를 읽습니다. 사용할 때 Kafka의 버전과 대응하는 플러그인 버전이 일치하는지 주의해야 합니다.이 플러그인은 Kafka를 SSL 및 Kerveros SASL 방식으로 연결할 수 있습니다.또한 이 플러그인은 그룹 관리를 제공하고 기본 오프셋 관리 정책을 사용하여 Kafka topic를 조작합니다.
Logstash는 기본적으로 하나의 단독 그룹을 사용하여 Kafka 메시지를 구독합니다. 모든 Logstash Kafka Consumer는 여러 개의 라인을 사용하여 흡수량을 증가합니다.물론 여러 Logstash 실례가 같은 그룹을 사용할 수도 있습니다_id, 부하를 균형 있게 합니다.또한 Consumer의 개수를 Kafka 구역의 크기로 설정하여 더 좋은 성능을 제공하는 것을 권장합니다.

2 테스트 환경 준비


2.1 Elasticsearch 클러스터 생성


구축 과정을 간소화하기 위해 본고는 텐센트 클라우드 Elasticsearch 서비스를 사용했다.텐센트 클라우드 Elasticsearch 서비스는 Elasticsearch 집단의 신속한 구축을 실현할 수 있을 뿐만 아니라 내장된 Kibana, 집단 모니터링, 전용 주 노드, Ik분사 플러그인 등 기능을 제공하여 Elasticsearch 집단의 창설과 관리 업무를 크게 간소화시켰다.

2.2 Kafka 서비스 만들기


Kafka 서비스의 구축은 텐센트 클라우드 CKafka로 이루어진다.Elasticsearch Service와 마찬가지로 텐센트 클라우드 CKafka는 Kafka 서비스의 신속한 창설을 실현하고 개원 Kafka API(0.9 버전)를 100% 호환할 수 있다.

2.3 서버


Elasticsearch와 Kafka를 준비하는 것 외에 Logstash를 실행하여 Elasticsearch와 Kafka를 연결하는 서버를 준비해야 합니다.본고는 텐센트 클라우드 CVM 서버를 채택하였다.

2.4 고려 사항


1) Elasticsearch, Kafka, 서버를 같은 네트워크에 만들어서 네트워크가 서로 통하도록 해야 한다.본고는 텐센트 클라우드와 관련된 기술 서비스를 사용하기 때문에 Elasticsearch 서비스, CKafka와 CVM을 같은 개인 네트워크(VPC)에서 만들면 된다.
2) Elasticsearch Service, CKafka 및 CVM의 네트워크 주소 및 포트를 확인하여 후속 서비스를 사용할 수 있도록 합니다.
이번 테스트:
서비스
ip
port
Elasticsearch service
192.168.0.8
9200
Ckafka
192.168.13.10
9092
CVM
192.168.0.13
-

3 Logstash를 사용하여 Elasticsearch 및 Kafka 연결


3.1 카프카 준비


참고로 [CKafka 사용 시작]
위의 강좌에 따라
1) kafka_ 만들기es_test topic
2) JDK 설치
3) Kafka 키트 설치
4) producer 및 consumer 검증 kafka 기능 만들기

3.2 Logstash 설치


Logstash의 설치와 사용은 참고할 수 있습니다. [빠른 시작 Logstash]

3.3 Logstash Kafka input 플러그인 구성


kafka_ 만들기test_pipeline.conf 파일의 내용은 다음과 같습니다.
input{
        kafka{
                bootstrap_servers=>"192.168.13.10:9092"
                topics=>["kafka_es_test"]
                group_id=>"logstash_kafka_test"
        }
}
output{
        elasticsearch{
                hosts=>["192.168.0.8:9200"]
        }
}

그중에kafka의 input와elasticsearch의 output를 정의했습니다.
Kafka input 플러그인에 대해 상기 세 가지 매개 변수는 필수 매개 변수입니다. 이외에 플러그인 동작을 조정하는 매개 변수도 있습니다. 예를 들어 다음과 같습니다.
auto_commit_interval_ms는 Consumer가 Kafka에 오프셋을 제출하는 시간 간격을 설정합니다.
consumer_threads는 Consumer의 스레드 수를 설정하는 데 사용됩니다. 기본값은 1입니다. 실제적으로 Kafka Topic 섹션 수와 일치해야 합니다.
fetch_max_wait_ms는 fetch 요청을 기다리는 소비자를 지정합니다_min_bytes의 최장 시간
fetch_min_bytes는 Consumer fetch 요청이 반환될 최소 데이터 양을 지정합니다.
topics_pattern은 정규 구독을 통해 특정한 규칙에 맞는 토픽 그룹에 사용됩니다.
추가 매개변수 참조: [Kafka Input Configuration Options]

3.4 Logstash 시작


다음 작업은 Logstash 루트 디렉토리에서 수행됩니다.
1) 구성 확인
./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit

오류가 있으면 프롬프트에 따라 구성 파일을 수정합니다.만약 설정이 정확하면 다음과 같은 결과를 얻을 수 있다
Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties
[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}
[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}
Configuration OK
[2018-11-11T15:24:01,746][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

2) Logstash 시작
./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic

로그에 잘못된 알림이 있는지 관찰하고 제때에 처리합니다

3.4 Kafka Producer 시작


다음 작업은 Kafka 툴킷 루트 디렉토리에서 수행됩니다.
./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test

테스트 데이터 쓰기
This is a message

3.5 Kibana 검증 결과


Kibana에 대한 Elasticsearch에 로그인하여 Dev Tools에서 다음 작업을 수행합니다.
1) 색인 보기
GET _cat/indices

logstash-xxx라는 이름을 볼 수 있습니다.xx.xx의 인덱스가 생성되었습니다.
green open .kibana             QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb

2) 쓰기 데이터 보기
GET logstash-2018.11.11/_search

데이터가 성공적으로 기록되었음을 볼 수 있다
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "logstash-2018.11.11",
        "_type": "logs",
        "_id": "AWcBsEegMu-Dkjm1ap3H",
        "_score": 1,
        "_source": {
          "message": "This is a message",
          "@version": "1",
          "@timestamp": "2018-11-11T07:33:09.079Z"
        }
      }
    ]
  }
}

4 총결산


Logstash는 Elastic Stack에서 데이터를 수집하고 처리하는 핵심 구성 요소로서 Elasticsearch에 강력한 데이터 원본 호환 능력을 제공합니다.테스트 과정에서 알 수 있듯이 Logstash를 사용하여 kafka와 Elaticsearch의 연결 과정을 실현하는 것은 상당히 간단하고 편리하다.또한 Logstash의 데이터 처리 기능은 이 구조를 사용하는 시스템이 데이터 매핑과 처리에 천연적인 장점을 가진다.
그러나 Logstash를 사용하여 Kafka와 Elasticsearch의 연결을 실현하는 것은 Kafka와 Elasticsearch를 연결하는 유일한 방안이 아니라 또 다른 흔히 볼 수 있는 방안은 Kafka Connect를 사용하는 것이다. "Elasticsearch가 Kafka를 만나면--Kafka Connect"를 참고할 수 있다.
관련 읽기
[일일 과정 추천] 머신러닝 실전!빠른 온라인 광고 업무 시작 및 CTR 관련 지식
이 글은 이미 작가가 텐센트 클라우드 + 커뮤니티에 발표할 수 있는 권한을 부여받았습니다. 더 많은 원문은 클릭
관심 공중호'운가 커뮤니티'를 검색하면 가장 먼저 기술 건화물을 얻을 수 있습니다. 관심 후 1024에게 기술 과정 선물을 드리겠습니다!
대량의 기술 실천 경험은 모두 운가 지역사회에 있다!

좋은 웹페이지 즐겨찾기