Kafka Streams 입문 실례 1 WordCount

3091 단어
WordCount
워드 카 운 트 는 빅 데이터 계 의 Hello World 라 고 할 수 있 으 며, Hadoop 이 든 Spark 등 빅 데이터 도구 의 시작 사례 든 첫 번 째 십 중 팔 구 는 워드 카 운 트 라 고 믿 습 니 다.
카 프 카 스 트림 도 예 외 는 아니다.Kafka 메시지 시스템 에 통합 되 는 데이터 실시 간 처리 인터페이스 로 서 WordCount 도 좋 은 입문 사례 가 될 수 있 습 니 다.
실제로 카 프 카 는 공식 적 으로 워드 카운터 의 데모 org.apache.kafka.streams.examples.wordcount.WordCountDemo 를 제 공 했 지만, 직접 한 번 실현 하면 빠 른 입문 을 도 울 수 있다.
논리 프로 세 스
기억 해 야 할 것 은 카 프 카 의 데이터 가 모두 그 형태 로 존재 한 다 는 점 이다.
만약 에 우리 의 Kafka 에 topic 가 존재 한다 고 가정 하면 그 중의 데 이 터 는 텍스트 파일 에서 나온다.우 리 는 Kafka Streams Application 을 작성 하여 이 topic 의 데 이 터 를 WordCount 로 계산 하 기 를 원 합 니 다. 대략 절 차 는 다음 과 같 습 니 다.
  • Stream 은 원본 topic 에서 각 줄 의 데이터 기록 (형식) -
  • MapValue 는 value 의 모든 텍스트 를 소문 자로 변환 합 니 다. -
  • FlatMapValues 는 빈 칸 에 따라 단어 ---,
  • 로 분해 된다.
  • SelectKey 는 value 의 값 을 key ---,
  • 에 부여 합 니 다.
  • GroupByKey 는 같은 Key 로 그룹 을 나눈다 --- (,), ()
  • Count 는 각 그룹의 원소 개 수 를 계산한다 ---,
  • 결 과 를 Kafka
  • 로 되 돌려 보 내기 위해
    논리 코드
    먼저 Kafka Streams Application 의 ID, Kafka 클 러 스 터 위치 등 을 설정 합 니 다.
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    

    여 기 는 Kafka Streams DSL 을 사용 합 니 다.DSL 이 제공 하 는 각종 산 자 는 대부분 수 요 를 만족 시 킬 수 있다.
    DSL 사용:
    StreamsBuilder builder = new StreamsBuilder();
    

    Kafka 원본 topic 에서 데이터 흐름 가 져 오기:
    KStream textLines = builder.stream("streams-plaintext-input");
    

    KStream 은 각 데이터 기록 으로 구 성 된 데이터 흐름 을 대표 한다.
    KStream 은 하나 이상 의 topic 에서 데 이 터 를 얻 을 수 있 습 니 다.KStream 은 데이터 기록 을 조목조목 변환 하여 다른 KStream, KTable 과 join 작업 을 하거나 aggregate 를 KTable 로 만 들 수 있 습 니 다.
    받 은 KStream 에 transformation 과 aggregation 을 진행 합 니 다.
    데이터 기록 의 대문자 전 체 를 소문 자로 바 꿉 니 다:
    .mapValues(textLine -> textLine.toLowerCase())
    

    각 줄 의 데 이 터 를 빈 칸 으로 나 눕 니 다:
    .flatMapValues(textLine -> Arrays.asList(textLine.split(" ")))
    

    value 를 새로운 key 로 사용 하기:
    .selectKey((key, word) -> word)
    

    aggregation 작업 전 group by key:
    .groupByKey()
    

    각 그룹의 원소 개 수 를 계산 합 니 다:
    .count(Materialized.as("Counts"));
    

    결 과 를 얻 은 후 KTable 로 저장:
    KTable wordCounts = textLines
                                      .mapValues ...
    

    마지막 으로 대상 topic 를 가 져 옵 니 다. 그 중에서 key 는 String 이 고 value 는 Long 입 니 다.
    wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
    
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();
    

    좋은 웹페이지 즐겨찾기