Spring Cloud를 사용한 기능적 Kafka - 2부

따라서 기본 kafka 환경과 Spring Boot Web API를 설정하여 Kafka에 기본 문자열 메시지를 푸시합니다. 이 기사에서는 값을 읽는 데 시간을 할애하겠습니다.

중단한 부분부터 계속해서 https://start.spring.io 웹 사이트를 사용하여 다른 Spring 애플리케이션을 생성해야 합니다.



이번에는 Spring Web 종속성을 생략할 수 있습니다!

우리는 아직 멋진 것을 추가하지 않을 것입니다. 코드 베이스를 가져와서 새 애플리케이션을 시작할 준비를 하십시오!

스프링 클라우드 설정



먼저 Spring Cloud 종속성을 설정하겠습니다. pom.xml에서 다음과 같이 변경해 보겠습니다.
  • Spring Cloud의 버전을 나타내는 새 속성을 추가합니다.

  • <properties>
        <java.version>18</java.version>
        <spring-cloud-release.version>2021.0.1</spring-cloud-release.version>
    </properties>
    


    다음으로 Spring Cloud POM을 참조하려고 합니다. project 태그 아래에 다음 XML 블록을 추가합니다.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud-release.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    


    여기에는 Spring Cloud v2021.0.1에서 필요한 모든 종속성이 포함됩니다.

    문자열 기반 소비자의 경우 추가된 2개의 핵심 종속성과 1개의 테스트 종속성이 필요합니다.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <scope>test</scope>
        <classifier>test-binder</classifier>
        <type>test-jar</type>
    </dependency>
    


    소비자 및 프로세서



    이제 우리는 기본적으로 데이터 파이프라인이 될 코드를 추가할 수 있습니다. 주제에서 메시지를 가져온 다음 이를 다른 주제로 전달하기 전에 이를 사용하여 작업을 수행할 수 있습니다.

    다음 빈으로 새 클래스KafkaConfiguration.java를 만듭니다.

    package com.example.demo;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import reactor.core.publisher.Flux;
    
    import java.util.function.Consumer;
    import java.util.function.Function;
    
    
    @Configuration
    public class KafkaConfiguration {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
    
        @Bean
        public Function<Flux<String>, Flux<String>> employeeProcessor() {
            return employee -> employee.map(emp -> {
               log.info("Employee is {}", emp);
               return emp;
            }).log();
        }
    
        @Bean
        public Consumer<String> employeeConsumer() {
            return (value) -> log.info("Consumed {}", value);
        }
    
    }
    


    코드를 살펴봅시다!

    발생하는 값을 출력하는 기본 로거가 있습니다.

    private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
    


    다음으로 주제에서 오는 메시지의 입력/출력 역할을 할 bean이 있습니다.

    @Bean
    public Function<Flux<String>, Flux<String>> employeeProcessor() {
        return employee -> employee.map(emp -> {
           log.info("Employee is {}", emp);
           return emp;
        }).log();
    }
    


    우리의 메서드 서명은 (데이터 입력/데이터 출력) 파이프라인을 나타냅니다.

    Function<Flux<String>, Flux<String>>
    


    여기서 우리는 기본적으로 우리가 읽는 메시지가 문자열( <Flux<String>, )이 될 것이고 문자열( , Flux<String>> )인 메시지를 보낼 것이라고 말하고 있습니다.

    프로세서 본체는 메시지를 읽고 로그에 출력합니다(몇 번!).

    return employee -> employee.map(emp -> {
           log.info("Employee is {}", emp);
           return emp;
        }).log();
    


    일반적으로 다른 출력을 생성하는 수신 데이터로 더 많은 작업을 수행하기 위해 다른 메서드를 호출할 수 있습니다. 지금은 메시지를 있는 그대로 보냅니다.

    다음 빈은 소비자입니다. 지금은 문자열 메시지만 로그아웃합니다.

    @Bean
    public Consumer<String> employeeConsumer() {
        return (value) -> log.info("Consumed {}", value);
    }
    


    이 응용 프로그램을 실행하기 전에 항목이 아직 정의되지 않은 항목이 누락되었음을 알 수 있습니다.

    Spring Cloud Functions를 사용하여 이러한 메시징 앱을 만들 때 이러한 빈을 구성하고 메시지 버스의 기본 토픽에 바인딩하는 바인딩 기능을 따라야 합니다.
    src/main/resources/application.yml 파일을 열고 지금 해봅시다!

    spring:
      cloud:
        function:
          definition: employeeConsumer;employeeProcessor
        stream:
          bindings:
            employeeProcessor-in-0:
              destination: employee
            employeeProcessor-out-0:
              destination: employees-processed
            employeeConsumer-in-0:
              destination: employees-processed
    
          kafka:
            binder:
              producer-properties:
                value.serializer: org.apache.kafka.common.serialization.StringSerializer
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
              consumer-properties:
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
    


    먼저 구성에서 함수 정의를 만들었습니다.

    spring:
      cloud:
        function:
          definition: employeeConsumer;employeeProcessor
    


    보시다시피 이들은 방금 정의한 빈의 이름입니다. 나중에 구성에서 참조 지점으로 사용할 것입니다.

    다음으로 실제 Bean-to-Topic 바인딩을 정의했습니다.

    spring:
      cloud:
        stream:
          bindings:
            employeeProcessor-in-0:
              destination: employee
            employeeProcessor-out-0:
              destination: employees-processed
            employeeConsumer-in-0:
              destination: employees-processed
    


    정의( employeeProcessor, employeeConsumer )를 사용하여 그에 따라 주제를 매핑했습니다. 따라서 employeeProcessor에는 2개의 매핑이 있습니다.
  • 메시지 입력: employee
  • 메시지 출력: employees-processed

  • 한편 employeeConsumer에는 바인딩이 1개 있습니다.
  • 메시지 입력: employees-processed

  • 이는 employee 주제에 해당하는 모든 항목이 employeeProcesor 에 의해 선택됨을 의미합니다. employeeProcessor는 메시지를 인쇄한 다음 메시지를 employees-processed 항목으로 보냅니다.
    employees-processed 대기열로 들어오는 모든 메시지는 employeeConsumer에 의해 선택되며 다시 메시지를 로그에 출력합니다.

    마지막으로 구성에서 일반적으로 Kafka에 대한 몇 가지 기본 속성을 정의합니다.

    spring:
      cloud:
        stream:
          kafka:
            binder:
              producer-properties:
                value.serializer: org.apache.kafka.common.serialization.StringSerializer
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
              consumer-properties:
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
    


    간단히 말해서, 우리는 메시지가 나오고 토픽으로 들어가는 것을 문자열로 취급합니다!

    애플리케이션 시작



    이제 다음을 사용하여 애플리케이션을 실행할 수 있습니다.

    $ ./mvnw spring-boot:run
    


    이전 기사의 원본 웹 애플리케이션이 여전히 실행 중인 경우 API에서 메시지를 보내고 소비자가 메시지를 선택하는 것을 볼 수 있어야 합니다.

    Employee is Marshall, Andrew
    Consumed Marshall, Andrew
    


    다음 기사에서는 직렬 변환기로 Avro를 소개하고 바이너리 메시지와 스키마 레지스트리를 보내는 방법을 보여줍니다!

    좋은 웹페이지 즐겨찾기