IoT 센서 시뮬레이션, Kafka를 사용하여 실시간으로 데이터 처리, Elasticsearch에 저장

이전에 Apache Spark ML을 사용하여 예측 유지 관리 문제를 해결하는 프로젝트를 수행했습니다.

이제 Apache Kafka를 사용하여 실시간으로 그 감각 데이터를 수집하는 추가 프로젝트를 수행했습니다.

코드는 이 Github 저장소here에 있습니다.

먼저 TelemetryPoller 클래스를 생성했습니다. 임의의 전압 판독값을 시뮬레이트하고 생성해야 합니다. TelemetryRunnable 하위 클래스가 해당 부분을 처리합니다.


private Long produceSensorReading() {
    Random random = new Random();
    long leftLimit = 800L;
    long rightLimit = 1500L;

    long number = leftLimit + (long) (Math.random() * (rightLimit - leftLimit));

    // simulate reading time
    try {
        Thread.sleep(random.nextInt(1000));
    } catch (InterruptedException e) {
        logger.info("SensorReading generation interrupted!");
    }
    return number;
}


무작위로 전압 값을 생성합니다. 센서를 시뮬레이트하기 위해 약간의 시간 지연을 추가했습니다Thread.sleep(random.nextInt(1000)).

마찬가지로 임의의 컴퓨터 ID를 생성하는 방법 produceMachineId 이 있습니다(프로젝트에 100개의 컴퓨터가 있음).

그런 다음 다른 스레드에 대한 ProducerRunnable 을 만들었습니다. 스레드 안전LinkedBlockingQueue은 여러 스레드에서 공유됩니다.

Kafka 생산자 구성:

# Kafka Producer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# safe producer (Kafka >= 0.11)
enable.idempotence=true
acks=all
retries=2147483647
# if max.in.flight.requests.per.connection=5 Kafka >= 1.1 else 1
max.in.flight.requests.per.connection=5
# high throughput
compression.type=snappy
linger.ms=20
# (32*1024)
batch.size=32768


이름이 voltage-reading 인 주제를 만들었습니다. 내 로컬 컴퓨터를 사용하고 있으므로 부트스트랩 서버는 127.0.0.1:9092 입니다.

Kafka가 동일한 데이터를 여러 번 커밋하는 것을 원하지 않으므로 enable.idempotencetrue입니다. acks , retriesmax.in.flight.requests.per.connection 는 멱등 생산자에 대한 기본 모드입니다.

작은 지연linger.ms=20을 추가하고 batch.size를 늘려 더 나은 메시지 압축에 좋은 높은 처리량을 얻을 수 있습니다. Snappy는 JSON 압축에 적합한 옵션입니다.

결과는 기계 ID를 포함한 전압 값의 흐름입니다.



이제 Kafka 소비자와 Elasticsearch 클라이언트가 필요합니다.

ElasticsearchConsumer 클래스에서 만들었습니다.

먼저 설정에 대해 조금 설명하고 싶습니다.

# Kafka Consumer properties
topic=voltage-reading
bootstrap.servers=127.0.0.1:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=voltage-reading-elasticsearch
auto.offset.reset=latest
enable.auto.commit=false


다시 localhost에서 실행 중이므로 부트스트랩 서버는 127.0.0.1:9092 입니다. Kafka 소비자는 그룹 ID가 필요하거나 임의의 ID를 할당합니다. 배치를 수동으로 커밋하기 때문에 latest 메시지가 표시되고 enable.auto.commit가 거짓입니다. 수동 커밋을 해서 at least once 속성을 확보하겠지만, Elasticsearch 인덱스에 문서 중복에 주의가 필요합니다. 그래서 추가로 카프카 레코드의 토픽명, 파티션 번호, 오프셋 번호를 이용하여 고유한 id를 생성하고 있습니다.

String id = record.topic() + "_" + record.partition() + '_' + record.offset();


중복을 방지하기 위해 이 ID를 Elasticsearch 클라이언트에 제공하고 있습니다.

마지막으로 while 루프:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    BulkRequest bulkRequest = new BulkRequest();

    for (ConsumerRecord<String, String> record : records) {
        // kafka generic id to prevent document duplication
        String id = record.topic() + "_" + record.partition() + '_' + record.offset();
        logger.info("Consumer reads: " + record.value());

        try {
            // insert data into elasticsearch
            IndexRequest indexRequest = new IndexRequest("machine-telemetry")
                    .id(id)
                    .source(record.value(), XContentType.JSON);

            bulkRequest.add(indexRequest);
        } catch (NullPointerException e) { // skip bad data
            logger.warn("Skipping bad data: " + record.value());
        }
    }

    if (records.count() > 0) {
        BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        consumer.commitSync();
    }
}


더 나은 성능을 위해 BulkRequest를 사용하고 있으므로 클라이언트는 for 루프의 각 레코드를 통과할 때까지 기다립니다. bulk 작업 후 Elasticsearch에 입력된 메시지를 수동으로 커밋해야 합니다. - consumer.commitSync() .

그리고 ElasticsearchConsumer 클래스가 실행 중일 때 스트리밍 데이터가 소비되고 있고 Elasticsearch에 있는 것을 볼 수 있습니다.



요약



이 프로젝트에서는 마치 IoT 센서에서 생성되는 것처럼 임의의 전압 값을 생성하는 시뮬레이터를 만들었습니다. 그런 다음 Kafka Producer를 만들어 해당 스트림을 Kafka(브로커)로 보냅니다. 마지막으로 Kafka Consumer를 사용하여 해당 스트림을 Elasticsearch에 넣었습니다.

생산자를 멱등성(브로커에서 중복되지 않음)으로 만들어서 안전하게 만들었습니다. 또한 JSON 문자열 데이터를 생산하고 소비하고 있기 때문에 Snappy라는 효율적인 압축 알고리즘을 적용했습니다. 배치가 클수록 압축이 더 효율적이기 때문에 약간의 지연을 두고 배치 크기를 16KB에서 32KB로 늘렸습니다.

소비자 측에서는 at least once 동작을 보장하기 위해 자동 커밋을 비활성화하고 메시지를 Elasticseach에 넣은 후 수동으로 커밋을 수행했습니다.

좋은 웹페이지 즐겨찾기