Kafka Streams 입문 실례 1 WordCount
워드 카 운 트 는 빅 데이터 계 의 Hello World 라 고 할 수 있 으 며, Hadoop 이 든 Spark 등 빅 데이터 도구 의 시작 사례 든 첫 번 째 십 중 팔 구 는 워드 카 운 트 라 고 믿 습 니 다.
카 프 카 스 트림 도 예 외 는 아니다.Kafka 메시지 시스템 에 통합 되 는 데이터 실시 간 처리 인터페이스 로 서 WordCount 도 좋 은 입문 사례 가 될 수 있 습 니 다.
실제로 카 프 카 는 공식 적 으로 워드 카운터 의 데모
org.apache.kafka.streams.examples.wordcount.WordCountDemo
를 제 공 했 지만, 직접 한 번 실현 하면 빠 른 입문 을 도 울 수 있다.논리 프로 세 스
기억 해 야 할 것 은 카 프 카 의 데이터 가 모두 그 형태 로 존재 한 다 는 점 이다.
만약 에 우리 의 Kafka 에 topic 가 존재 한다 고 가정 하면 그 중의 데 이 터 는 텍스트 파일 에서 나온다.우 리 는 Kafka Streams Application 을 작성 하여 이 topic 의 데 이 터 를 WordCount 로 계산 하 기 를 원 합 니 다. 대략 절 차 는 다음 과 같 습 니 다.
논리 코드
먼저 Kafka Streams Application 의 ID, Kafka 클 러 스 터 위치 등 을 설정 합 니 다.
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
여 기 는 Kafka Streams DSL 을 사용 합 니 다.DSL 이 제공 하 는 각종 산 자 는 대부분 수 요 를 만족 시 킬 수 있다.
DSL 사용:
StreamsBuilder builder = new StreamsBuilder();
Kafka 원본 topic 에서 데이터 흐름 가 져 오기:
KStream textLines = builder.stream("streams-plaintext-input");
KStream 은 각 데이터 기록 으로 구 성 된 데이터 흐름 을 대표 한다.
KStream 은 하나 이상 의 topic 에서 데 이 터 를 얻 을 수 있 습 니 다.KStream 은 데이터 기록 을 조목조목 변환 하여 다른 KStream, KTable 과 join 작업 을 하거나 aggregate 를 KTable 로 만 들 수 있 습 니 다.
받 은 KStream 에 transformation 과 aggregation 을 진행 합 니 다.
데이터 기록 의 대문자 전 체 를 소문 자로 바 꿉 니 다:
.mapValues(textLine -> textLine.toLowerCase())
각 줄 의 데 이 터 를 빈 칸 으로 나 눕 니 다:
.flatMapValues(textLine -> Arrays.asList(textLine.split(" ")))
value 를 새로운 key 로 사용 하기:
.selectKey((key, word) -> word)
aggregation 작업 전 group by key:
.groupByKey()
각 그룹의 원소 개 수 를 계산 합 니 다:
.count(Materialized.as("Counts"));
결 과 를 얻 은 후 KTable 로 저장:
KTable wordCounts = textLines
.mapValues ...
마지막 으로 대상 topic 를 가 져 옵 니 다. 그 중에서 key 는 String 이 고 value 는 Long 입 니 다.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.