Spark를 사용하여 Elasticsearch에 데이터를 신속하게 기록하는 방법
Spark를 사용하여 Elasticsearch에 데이터를 신속하게 기록하는 방법
데이터가 Elasticsearch에 기록되면 가장 먼저 떠오르는 것은 Logstash이다.Logstash는 간단하고 확장 가능하며 신축 가능한 장점 때문에 많은 사용자들이 받아들인다.그러나 자가 짧고 한 치의 장점도 있다. Logstash도 적용할 수 없는 응용 장면이 있을 것이다. 예를 들어 다음과 같다.
이러한 장면을 만족시키기 위해 많은 학우들이 스파크를 선택하여 스파크 산자를 빌려 데이터 처리를 하고 마지막으로 처리 결과를 Elasticsearch에 기록한다.
우리 부서는 이전에 Spark를 이용하여 Nginx 로그를 분석하고 우리의 웹 서비스 방문 상황을 통계하며 Nginx 로그를 분당 한 번씩 집합하여 최종적으로 결과를 Elasticsearch에 기록한 다음에 Kibana 설정을 이용하여 실시간으로 대시보드를 감시했다.Elasticsearch와 Kibana는 모두 편리하고 실용적이지만 유사한 수요가 갈수록 많아지면서 스파크를 통해 데이터를 어떻게 신속하게 Elasticsearch에 쓰는가가 우리의 큰 문제가 되었다.
오늘 여러분께 데이터의 빠른 쓰기를 실현할 수 있는 블랙 테크놀로지인 워터드롭(Waterdrop)을 추천합니다. 매우 사용하기 쉽고 고성능이며 대량의 데이터에 대응할 수 있는 실시간 데이터 처리 제품으로 스파크에 구축되어 간단하고 사용하기 쉬우며 유연하게 배치되어 개발할 필요가 없습니다.
Kafka to Elasticsearch
Logstash와 마찬가지로 Waterdrop은 다양한 유형의 데이터 입력을 지원합니다. 여기서 가장 흔히 볼 수 있는 Kakfa를 입력원으로 삼아 Waterdrop을 사용하여 데이터를 Elasticsearch에 신속하게 쓰는 방법을 설명합니다.
Log Sample
원본 로그 형식은 다음과 같습니다.
127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"
Elasticsearch Document
1분마다 도메인 이름에 대한 액세스를 집계하고자 합니다. 수집된 데이터에는 다음 필드가 있습니다.
domain String
hostname String
status int
datetime String
count int
Waterdrop with Elasticsearch
다음은 Waterdrop을 통해 Kafka의 데이터를 읽고 데이터를 해석하고 집합하는 방법을 상세히 소개하고, 마지막으로 처리 결과를 Elasticsearch에 기록합니다.
Waterdrop
Waterdrop 역시 매우 풍부한 플러그인을 가지고 있으며 Kafka, HDFS, Hive에서 데이터를 읽고 다양한 데이터 처리를 지원하며 결과를 Elasticsearch, Kudu 또는 Kafka에 기록합니다.
Prerequisites
우선 Waterdrop을 설치해야 합니다. 설치가 매우 간단하고 시스템 환경 변수를 설정할 필요가 없습니다.
다음은 Quick Start를 참조할 수 있는 간단한 단계입니다.
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1
vim config/waterdrop-env.sh
# Spark
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
Waterdrop Pipeline
Logstash와 마찬가지로, 우리는 Waterdrop Pipeline의 프로필만 작성하면 데이터의 가져오기를 완성할 수 있으며, Logstash를 아는 친구가 Waterdrop 프로필을 빨리 구할 수 있을 것이라고 믿습니다.
구성 파일은 Spark, Input, filter, Output 등 네 가지 섹션으로 구성됩니다.
Spark
이 부분은 스파크의 관련 설정으로 스파크가 실행될 때 필요한 자원 크기를 주로 설정한다.
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}
Input
이 부분에서 데이터 원본을 정의합니다. 다음은 Kafka에서 데이터를 읽는 설정 사례입니다.
kafkaStream {
topics = "waterdrop-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_es_group"
consumer.rebalance.max.retries = 100
}
Filter
Filter 섹션에서는 로그를 분할하는 정규 해석, HTTPDATE를 Elasticsearch에서 지원하는 날짜 형식으로 변환하는 시간 변환, Number 형식의 필드 형식 변환, SQL을 통한 데이터 집합 등 일련의 변환을 구성합니다.
filter {
#
# raw_message
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# "dd/MMM/yyyy:HH:mm:ss Z"
# Elasticsearch
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}
## SQL
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
Output
마지막으로 우리는 처리된 구조화된 데이터를 Elasticsearch에 쓸 것이다.
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}
Running Waterdrop
우리는 상술한 네 부분의 설정을 우리의 프로필로 조합할 것이다
config/batch.conf
. vim config/batch.conf
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}
input {
kafkaStream {
topics = "waterdrop-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {
#
# raw_message
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# "dd/MMM/yyyy:HH:mm:ss Z"
# Elasticsearch
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}
## SQL
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}
명령을 실행하고 프로필을 지정하고 Waterdrop을 실행하면 데이터를 Elasticsearch에 쓸 수 있습니다.여기서 우리는 현지 모델을 예로 들 수 있다.
./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
마지막으로 Elasticsearch에 기록된 데이터는 다음과 같습니다. Kibana를 추가하면 웹 서비스의 실시간 모니터링을 실현할 수 있습니다 _.
"_source": {
"domain": "elasticsearch.cn",
"hostname": "localhost",
"status": "200",
"datetime": "2018-11-26T21:54:00.000+08:00",
"count": 26
}
Conclusion
이 글에서 우리는 워터드롭을 통해 카프카의 데이터를 Elasticsearch에 기록하는 방법을 소개했다.하나의 프로필만으로 Spark Application을 신속하게 실행할 수 있으며, 데이터의 처리와 쓰기를 완성할 수 있으며, 코드를 작성하지 않아도 매우 간단하다.
데이터 처리 과정에서 Logstash가 지원할 수 없는 장면이나 Logstah 성능이 기대에 미치지 못할 경우 Waterdrop을 사용하여 문제를 해결할 수 있습니다.
Waterdrop과 Elasticsearch, Kafka, Hadoop이 결합하여 사용하는 더 많은 기능과 사례를 알고 싶으면 프로젝트 메인 페이지에 직접 들어갈 수 있습니다https://github.com/InterestingLab/waterdrop
–Power by InterestingLab
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.