IoT 센서 시뮬레이션, Kafka를 사용하여 실시간으로 데이터 처리, Elasticsearch에 저장
12400 단어 bigdatadatascienceelasticsearchkafka
이제 Apache Kafka를 사용하여 실시간으로 그 감각 데이터를 수집하는 추가 프로젝트를 수행했습니다.
코드는 이 Github 저장소here에 있습니다.
먼저
TelemetryPoller
클래스를 생성했습니다. 임의의 전압 판독값을 시뮬레이트하고 생성해야 합니다. TelemetryRunnable
하위 클래스가 해당 부분을 처리합니다.
private Long produceSensorReading() {
Random random = new Random();
long leftLimit = 800L;
long rightLimit = 1500L;
long number = leftLimit + (long) (Math.random() * (rightLimit - leftLimit));
// simulate reading time
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
logger.info("SensorReading generation interrupted!");
}
return number;
}
무작위로 전압 값을 생성합니다. 센서를 시뮬레이트하기 위해 약간의 시간 지연을 추가했습니다
Thread.sleep(random.nextInt(1000))
.마찬가지로 임의의 컴퓨터 ID를 생성하는 방법
produceMachineId
이 있습니다(프로젝트에 100개의 컴퓨터가 있음).그런 다음 다른 스레드에 대한
ProducerRunnable
을 만들었습니다. 스레드 안전LinkedBlockingQueue
은 여러 스레드에서 공유됩니다.Kafka 생산자 구성:
# Kafka Producer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# safe producer (Kafka >= 0.11)
enable.idempotence=true
acks=all
retries=2147483647
# if max.in.flight.requests.per.connection=5 Kafka >= 1.1 else 1
max.in.flight.requests.per.connection=5
# high throughput
compression.type=snappy
linger.ms=20
# (32*1024)
batch.size=32768
이름이
voltage-reading
인 주제를 만들었습니다. 내 로컬 컴퓨터를 사용하고 있으므로 부트스트랩 서버는 127.0.0.1:9092
입니다.Kafka가 동일한 데이터를 여러 번 커밋하는 것을 원하지 않으므로
enable.idempotence
는 true
입니다. acks
, retries
및 max.in.flight.requests.per.connection
는 멱등 생산자에 대한 기본 모드입니다.작은 지연
linger.ms=20
을 추가하고 batch.size
를 늘려 더 나은 메시지 압축에 좋은 높은 처리량을 얻을 수 있습니다. Snappy는 JSON 압축에 적합한 옵션입니다.결과는 기계 ID를 포함한 전압 값의 흐름입니다.
이제 Kafka 소비자와 Elasticsearch 클라이언트가 필요합니다.
ElasticsearchConsumer
클래스에서 만들었습니다.먼저 설정에 대해 조금 설명하고 싶습니다.
# Kafka Consumer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=voltage-reading-elasticsearch
auto.offset.reset=latest
enable.auto.commit=false
다시 localhost에서 실행 중이므로 부트스트랩 서버는
127.0.0.1:9092
입니다. Kafka 소비자는 그룹 ID가 필요하거나 임의의 ID를 할당합니다. 배치를 수동으로 커밋하기 때문에 latest
메시지가 표시되고 enable.auto.commit
가 거짓입니다. 수동 커밋을 해서 at least once
속성을 확보하겠지만, Elasticsearch 인덱스에 문서 중복에 주의가 필요합니다. 그래서 추가로 카프카 레코드의 토픽명, 파티션 번호, 오프셋 번호를 이용하여 고유한 id를 생성하고 있습니다.String id = record.topic() + "_" + record.partition() + '_' + record.offset();
중복을 방지하기 위해 이 ID를 Elasticsearch 클라이언트에 제공하고 있습니다.
마지막으로 while 루프:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
BulkRequest bulkRequest = new BulkRequest();
for (ConsumerRecord<String, String> record : records) {
// kafka generic id to prevent document duplication
String id = record.topic() + "_" + record.partition() + '_' + record.offset();
logger.info("Consumer reads: " + record.value());
try {
// insert data into elasticsearch
IndexRequest indexRequest = new IndexRequest("machine-telemetry")
.id(id)
.source(record.value(), XContentType.JSON);
bulkRequest.add(indexRequest);
} catch (NullPointerException e) { // skip bad data
logger.warn("Skipping bad data: " + record.value());
}
}
if (records.count() > 0) {
BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
consumer.commitSync();
}
}
더 나은 성능을 위해
BulkRequest
를 사용하고 있으므로 클라이언트는 for 루프의 각 레코드를 통과할 때까지 기다립니다. bulk
작업 후 Elasticsearch에 입력된 메시지를 수동으로 커밋해야 합니다. - consumer.commitSync()
.그리고
ElasticsearchConsumer
클래스가 실행 중일 때 스트리밍 데이터가 소비되고 있고 Elasticsearch에 있는 것을 볼 수 있습니다.요약
이 프로젝트에서는 마치 IoT 센서에서 생성되는 것처럼 임의의 전압 값을 생성하는 시뮬레이터를 만들었습니다. 그런 다음 Kafka Producer를 만들어 해당 스트림을 Kafka(브로커)로 보냅니다. 마지막으로 Kafka Consumer를 사용하여 해당 스트림을 Elasticsearch에 넣었습니다.
생산자를 멱등성(브로커에서 중복되지 않음)으로 만들어서 안전하게 만들었습니다. 또한 JSON 문자열 데이터를 생산하고 소비하고 있기 때문에 Snappy라는 효율적인 압축 알고리즘을 적용했습니다. 배치가 클수록 압축이 더 효율적이기 때문에 약간의 지연을 두고 배치 크기를 16KB에서 32KB로 늘렸습니다.
소비자 측에서는
at least once
동작을 보장하기 위해 자동 커밋을 비활성화하고 메시지를 Elasticseach에 넣은 후 수동으로 커밋을 수행했습니다.
Reference
이 문제에 관하여(IoT 센서 시뮬레이션, Kafka를 사용하여 실시간으로 데이터 처리, Elasticsearch에 저장), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/musaatlihan/simulate-iot-sensor-use-kafka-to-process-data-in-real-time-save-to-elasticsearch-13c8텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)