kafka Streams를 java로 구현해보자!
1. Version
💬
- Kafka : 2.6.0
- grdle : kafka-clients.2.8.1
- grdle : kafka-streams.2.8.1
2. build.gradle
💬 build.gredle dependencies
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.8.1'
}
3. Kafka Streams 개념
💬 link
💬 build.gredle dependencies
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.1'
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.8.1'
}
3. Kafka Streams 개념
💬 link
4. Kafka Streams 구현
💻 java code
예제는 간단!
- kafka 설치한 서버에 어떠한 시스템이 메세지를 게시하면
그 해당 메세지를 실시간으로 조건 필터하는 기능이다.!필터 조건 : 들어오는 글자 수가 5개 초과만 데이터를 전달한다.
package com.karim.kafkaBasis.kafkaStreams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class KafkaStreamsBasis {
private static final String KAFKA_SINGLE_IP = "192.168.124.222:9092";
private static final String RECEIVE_TOPIC_NAME = "karim-rcv-topic";
private static final String SEND_TOPIC_NAME = "karim-send-topic";
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
// 카프카 스트림즈를 유일하게 구분할 ID값
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
// 스트림즈에 접근할 카프카 broker 정보
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SINGLE_IP);
// 데이터를 어떤 형식으로 Read/Write 할지 성정 (키/값의 데이터 타입을 지정)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 스트림 토폴로지를 정의하기 위한 빌더
StreamsBuilder builder = new StreamsBuilder();
// 소스 프로세서 동작 -> RECEIVE_TOPIC_NAME 토픽으로 부터 KStream 객체를 만든다.
KStream<String, String> stringLength5Over = builder.stream(RECEIVE_TOPIC_NAME);
// 스트림 프로세서 동작 ->
// RECEIVE_TOPIC_NAME 토픽에서 가져온 데이터 중
// length 가 5를 넘는 경우의 값만 남도록 필터링 하여 KStream 객체를 새롭게 생성
KStream<String, String> filterStream = stringLength5Over.filter(
((key, value) -> value.length() > 5)
);
// 싱크 프로세서 동작 ->
// SEND_TOPIC_NAME 토픽으로 KStream 데이터를 전달한다.
filterStream.to(SEND_TOPIC_NAME);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
5. Kafka Console Producer
💬 console view
- 글자수가 5개 초과인
Karim velog cuteeeee-
와 5개 미만인 Kari
를 차례대로 producer한다.
[karim@kafka-single bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic karim-rcv-topic
> Karim velog cuteeeee-
> Kari
6. Kafka Console Consumer
💬 console view
Kari
은 Stream 필터 조건에 만족하지 않으므로 Stream쪽에서 걸러짐!
[karim@kafka-single bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic karim-send-topic
Karim velog cuteeeee-
Karim velog cuteeeee-
와 5개 미만인 Kari
를 차례대로 producer한다.[karim@kafka-single bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic karim-rcv-topic
> Karim velog cuteeeee-
> Kari
💬 console view
Kari
은 Stream 필터 조건에 만족하지 않으므로 Stream쪽에서 걸러짐!
[karim@kafka-single bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic karim-send-topic
Karim velog cuteeeee-
📚 참고
Author And Source
이 문제에 관하여(kafka Streams를 java로 구현해보자!), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@limsubin/kafka-Streams를-java로-구현해보자저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)